mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
24 Commits
docs/vecto
...
feat/scann
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
59ddfa84ec | ||
|
|
dd043eadc4 | ||
|
|
7e6af2c7ee | ||
|
|
87d3b17f4d | ||
|
|
5acac3d403 | ||
|
|
f9c66ba0de | ||
|
|
37847a8df6 | ||
|
|
6e06ac9e5c | ||
|
|
09effc8128 | ||
|
|
c14728e3ae | ||
|
|
cce4d56e00 | ||
|
|
69cf13b33a | ||
|
|
c83a282b39 | ||
|
|
5329efcdba | ||
|
|
50b5c90d53 | ||
|
|
fea2966dec | ||
|
|
e00452c4db | ||
|
|
7a31b2a8ea | ||
|
|
f363d73f72 | ||
|
|
7a6befcad3 | ||
|
|
d6c75ec55f | ||
|
|
5b8f1d819f | ||
|
|
b68286e8af | ||
|
|
4519607bc6 |
@@ -21,7 +21,7 @@ use itertools::Itertools;
|
||||
|
||||
use crate::Bytes;
|
||||
use crate::bloom_filter::error::Result;
|
||||
use crate::bloom_filter::reader::BloomFilterReader;
|
||||
use crate::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
|
||||
|
||||
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
|
||||
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
|
||||
@@ -38,7 +38,7 @@ pub struct BloomFilterApplier {
|
||||
|
||||
impl BloomFilterApplier {
|
||||
pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
|
||||
let meta = reader.metadata().await?;
|
||||
let meta = reader.metadata(None).await?;
|
||||
|
||||
Ok(Self { reader, meta })
|
||||
}
|
||||
@@ -50,6 +50,7 @@ impl BloomFilterApplier {
|
||||
&mut self,
|
||||
predicates: &[InListPredicate],
|
||||
search_ranges: &[Range<usize>],
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Vec<Range<usize>>> {
|
||||
if predicates.is_empty() {
|
||||
// If no predicates, return empty result
|
||||
@@ -57,7 +58,7 @@ impl BloomFilterApplier {
|
||||
}
|
||||
|
||||
let segments = self.row_ranges_to_segments(search_ranges);
|
||||
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?;
|
||||
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments, metrics).await?;
|
||||
let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
|
||||
Ok(intersect_ranges(search_ranges, &matching_row_ranges))
|
||||
}
|
||||
@@ -95,6 +96,7 @@ impl BloomFilterApplier {
|
||||
async fn load_bloom_filters(
|
||||
&mut self,
|
||||
segments: &[usize],
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
|
||||
let segment_locations = segments
|
||||
.iter()
|
||||
@@ -108,7 +110,10 @@ impl BloomFilterApplier {
|
||||
.map(|i| self.meta.bloom_filter_locs[i as usize])
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?;
|
||||
let bloom_filters = self
|
||||
.reader
|
||||
.bloom_filter_vec(&bloom_filter_locs, metrics)
|
||||
.await?;
|
||||
|
||||
Ok((segment_locations, bloom_filters))
|
||||
}
|
||||
@@ -422,7 +427,10 @@ mod tests {
|
||||
];
|
||||
|
||||
for (predicates, search_range, expected) in cases {
|
||||
let result = applier.search(&predicates, &[search_range]).await.unwrap();
|
||||
let result = applier
|
||||
.search(&predicates, &[search_range], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
result, expected,
|
||||
"Expected {:?}, got {:?}",
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::ops::{Range, Rem};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytemuck::try_cast_slice;
|
||||
@@ -34,6 +35,32 @@ const BLOOM_META_LEN_SIZE: u64 = 4;
|
||||
/// Default prefetch size of bloom filter meta.
|
||||
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
|
||||
|
||||
/// Metrics for bloom filter read operations.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct BloomFilterReadMetrics {
|
||||
/// Total byte size to read.
|
||||
pub total_bytes: u64,
|
||||
/// Total number of ranges to read.
|
||||
pub total_ranges: usize,
|
||||
/// Elapsed time to fetch data.
|
||||
pub fetch_elapsed: Duration,
|
||||
/// Number of cache hits.
|
||||
pub cache_hit: usize,
|
||||
/// Number of cache misses.
|
||||
pub cache_miss: usize,
|
||||
}
|
||||
|
||||
impl BloomFilterReadMetrics {
|
||||
/// Merges another metrics into this one.
|
||||
pub fn merge_from(&mut self, other: &Self) {
|
||||
self.total_bytes += other.total_bytes;
|
||||
self.total_ranges += other.total_ranges;
|
||||
self.fetch_elapsed += other.fetch_elapsed;
|
||||
self.cache_hit += other.cache_hit;
|
||||
self.cache_miss += other.cache_miss;
|
||||
}
|
||||
}
|
||||
|
||||
/// Safely converts bytes to Vec<u64> using bytemuck for optimal performance.
|
||||
/// Faster than chunking and converting each piece individually.
|
||||
///
|
||||
@@ -79,25 +106,52 @@ pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec<u64> {
|
||||
#[async_trait]
|
||||
pub trait BloomFilterReader: Sync {
|
||||
/// Reads range of bytes from the file.
|
||||
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>;
|
||||
async fn range_read(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Bytes>;
|
||||
|
||||
/// Reads bunch of ranges from the file.
|
||||
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
async fn read_vec(
|
||||
&self,
|
||||
ranges: &[Range<u64>],
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Vec<Bytes>> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
|
||||
let mut results = Vec::with_capacity(ranges.len());
|
||||
for range in ranges {
|
||||
let size = (range.end - range.start) as u32;
|
||||
let data = self.range_read(range.start, size).await?;
|
||||
let data = self.range_read(range.start, size, None).await?;
|
||||
results.push(data);
|
||||
}
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += ranges.len();
|
||||
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
|
||||
if let Some(start) = start {
|
||||
m.fetch_elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Reads the meta information of the bloom filter.
|
||||
async fn metadata(&self) -> Result<BloomFilterMeta>;
|
||||
async fn metadata(
|
||||
&self,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilterMeta>;
|
||||
|
||||
/// Reads a bloom filter with the given location.
|
||||
async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> {
|
||||
let bytes = self.range_read(loc.offset, loc.size as _).await?;
|
||||
async fn bloom_filter(
|
||||
&self,
|
||||
loc: &BloomFilterLoc,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilter> {
|
||||
let bytes = self.range_read(loc.offset, loc.size as _, metrics).await?;
|
||||
let vec = bytes_to_u64_vec(&bytes);
|
||||
let bm = BloomFilter::from_vec(vec)
|
||||
.seed(&SEED)
|
||||
@@ -105,12 +159,16 @@ pub trait BloomFilterReader: Sync {
|
||||
Ok(bm)
|
||||
}
|
||||
|
||||
async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> {
|
||||
async fn bloom_filter_vec(
|
||||
&self,
|
||||
locs: &[BloomFilterLoc],
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Vec<BloomFilter>> {
|
||||
let ranges = locs
|
||||
.iter()
|
||||
.map(|l| l.offset..l.offset + l.size)
|
||||
.collect::<Vec<_>>();
|
||||
let bss = self.read_vec(&ranges).await?;
|
||||
let bss = self.read_vec(&ranges, metrics).await?;
|
||||
|
||||
let mut result = Vec::with_capacity(bss.len());
|
||||
for (bs, loc) in bss.into_iter().zip(locs.iter()) {
|
||||
@@ -140,24 +198,59 @@ impl<R: RangeReader> BloomFilterReaderImpl<R> {
|
||||
|
||||
#[async_trait]
|
||||
impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
|
||||
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
|
||||
self.reader
|
||||
async fn range_read(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Bytes> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
let result = self
|
||||
.reader
|
||||
.read(offset..offset + size as u64)
|
||||
.await
|
||||
.context(IoSnafu)
|
||||
.context(IoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += 1;
|
||||
m.total_bytes += size as u64;
|
||||
if let Some(start) = start {
|
||||
m.fetch_elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
self.reader.read_vec(ranges).await.context(IoSnafu)
|
||||
async fn read_vec(
|
||||
&self,
|
||||
ranges: &[Range<u64>],
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Vec<Bytes>> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
let result = self.reader.read_vec(ranges).await.context(IoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += ranges.len();
|
||||
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
|
||||
if let Some(start) = start {
|
||||
m.fetch_elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn metadata(&self) -> Result<BloomFilterMeta> {
|
||||
async fn metadata(
|
||||
&self,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilterMeta> {
|
||||
let metadata = self.reader.metadata().await.context(IoSnafu)?;
|
||||
let file_size = metadata.content_length;
|
||||
|
||||
let mut meta_reader =
|
||||
BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
|
||||
meta_reader.metadata().await
|
||||
meta_reader.metadata(metrics).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -183,7 +276,10 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
|
||||
///
|
||||
/// It will first prefetch some bytes from the end of the file,
|
||||
/// then parse the metadata from the prefetch bytes.
|
||||
pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
|
||||
pub async fn metadata(
|
||||
&mut self,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilterMeta> {
|
||||
ensure!(
|
||||
self.file_size >= BLOOM_META_LEN_SIZE,
|
||||
FileSizeTooSmallSnafu {
|
||||
@@ -191,6 +287,7 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
|
||||
}
|
||||
);
|
||||
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
let meta_start = self.file_size.saturating_sub(self.prefetch_size);
|
||||
let suffix = self
|
||||
.reader
|
||||
@@ -208,8 +305,28 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
|
||||
.read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
|
||||
.await
|
||||
.context(IoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
// suffix read + meta read
|
||||
m.total_ranges += 2;
|
||||
// Ignores the meta length size to simplify the calculation.
|
||||
m.total_bytes += self.file_size.min(self.prefetch_size) + length;
|
||||
if let Some(start) = start {
|
||||
m.fetch_elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
|
||||
} else {
|
||||
if let Some(m) = metrics {
|
||||
// suffix read only
|
||||
m.total_ranges += 1;
|
||||
m.total_bytes += self.file_size.min(self.prefetch_size);
|
||||
if let Some(start) = start {
|
||||
m.fetch_elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
|
||||
let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
|
||||
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
|
||||
@@ -290,7 +407,7 @@ mod tests {
|
||||
for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
|
||||
let mut reader =
|
||||
BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
|
||||
let meta = reader.metadata().await.unwrap();
|
||||
let meta = reader.metadata(None).await.unwrap();
|
||||
|
||||
assert_eq!(meta.rows_per_segment, 2);
|
||||
assert_eq!(meta.segment_count, 2);
|
||||
@@ -312,11 +429,11 @@ mod tests {
|
||||
let bytes = mock_bloom_filter_bytes().await;
|
||||
|
||||
let reader = BloomFilterReaderImpl::new(bytes);
|
||||
let meta = reader.metadata().await.unwrap();
|
||||
let meta = reader.metadata(None).await.unwrap();
|
||||
|
||||
assert_eq!(meta.bloom_filter_locs.len(), 2);
|
||||
let bf = reader
|
||||
.bloom_filter(&meta.bloom_filter_locs[0])
|
||||
.bloom_filter(&meta.bloom_filter_locs[0], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(bf.contains(&b"a"));
|
||||
@@ -325,7 +442,7 @@ mod tests {
|
||||
assert!(bf.contains(&b"d"));
|
||||
|
||||
let bf = reader
|
||||
.bloom_filter(&meta.bloom_filter_locs[1])
|
||||
.bloom_filter(&meta.bloom_filter_locs[1], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(bf.contains(&b"e"));
|
||||
|
||||
@@ -74,7 +74,7 @@ async fn test_search(
|
||||
writer.finish().await.unwrap();
|
||||
|
||||
let reader = puffin_manager.reader(&file_name).await.unwrap();
|
||||
let index_dir = reader.dir(&blob_key).await.unwrap();
|
||||
let (index_dir, _metrics) = reader.dir(&blob_key).await.unwrap();
|
||||
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap();
|
||||
for (query, expected) in query_expected {
|
||||
let results = searcher.search(query).await.unwrap();
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
@@ -29,19 +30,59 @@ pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader;
|
||||
mod blob;
|
||||
mod footer;
|
||||
|
||||
/// Metrics for inverted index read operations.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct InvertedIndexReadMetrics {
|
||||
/// Total byte size to read.
|
||||
pub total_bytes: u64,
|
||||
/// Total number of ranges to read.
|
||||
pub total_ranges: usize,
|
||||
/// Elapsed time to fetch data.
|
||||
pub fetch_elapsed: Duration,
|
||||
/// Number of cache hits.
|
||||
pub cache_hit: usize,
|
||||
/// Number of cache misses.
|
||||
pub cache_miss: usize,
|
||||
}
|
||||
|
||||
impl InvertedIndexReadMetrics {
|
||||
/// Merges another metrics into this one.
|
||||
pub fn merge_from(&mut self, other: &Self) {
|
||||
self.total_bytes += other.total_bytes;
|
||||
self.total_ranges += other.total_ranges;
|
||||
self.fetch_elapsed += other.fetch_elapsed;
|
||||
self.cache_hit += other.cache_hit;
|
||||
self.cache_miss += other.cache_miss;
|
||||
}
|
||||
}
|
||||
|
||||
/// InvertedIndexReader defines an asynchronous reader of inverted index data
|
||||
#[mockall::automock]
|
||||
#[async_trait]
|
||||
pub trait InvertedIndexReader: Send + Sync {
|
||||
/// Seeks to given offset and reads data with exact size as provided.
|
||||
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>>;
|
||||
async fn range_read<'a>(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Vec<u8>>;
|
||||
|
||||
/// Reads the bytes in the given ranges.
|
||||
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
async fn read_vec<'a>(
|
||||
&self,
|
||||
ranges: &[Range<u64>],
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Vec<Bytes>> {
|
||||
let mut metrics = metrics;
|
||||
let mut result = Vec::with_capacity(ranges.len());
|
||||
for range in ranges {
|
||||
let data = self
|
||||
.range_read(range.start, (range.end - range.start) as u32)
|
||||
.range_read(
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
metrics.as_deref_mut(),
|
||||
)
|
||||
.await?;
|
||||
result.push(Bytes::from(data));
|
||||
}
|
||||
@@ -49,17 +90,29 @@ pub trait InvertedIndexReader: Send + Sync {
|
||||
}
|
||||
|
||||
/// Retrieves metadata of all inverted indices stored within the blob.
|
||||
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>>;
|
||||
async fn metadata<'a>(
|
||||
&self,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Arc<InvertedIndexMetas>>;
|
||||
|
||||
/// Retrieves the finite state transducer (FST) map from the given offset and size.
|
||||
async fn fst(&self, offset: u64, size: u32) -> Result<FstMap> {
|
||||
let fst_data = self.range_read(offset, size).await?;
|
||||
async fn fst<'a>(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<FstMap> {
|
||||
let fst_data = self.range_read(offset, size, metrics).await?;
|
||||
FstMap::new(fst_data).context(DecodeFstSnafu)
|
||||
}
|
||||
|
||||
/// Retrieves the multiple finite state transducer (FST) maps from the given ranges.
|
||||
async fn fst_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<FstMap>> {
|
||||
self.read_vec(ranges)
|
||||
async fn fst_vec<'a>(
|
||||
&mut self,
|
||||
ranges: &[Range<u64>],
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Vec<FstMap>> {
|
||||
self.read_vec(ranges, metrics)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|bytes| FstMap::new(bytes.to_vec()).context(DecodeFstSnafu))
|
||||
@@ -67,19 +120,28 @@ pub trait InvertedIndexReader: Send + Sync {
|
||||
}
|
||||
|
||||
/// Retrieves the bitmap from the given offset and size.
|
||||
async fn bitmap(&self, offset: u64, size: u32, bitmap_type: BitmapType) -> Result<Bitmap> {
|
||||
self.range_read(offset, size).await.and_then(|bytes| {
|
||||
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
|
||||
})
|
||||
async fn bitmap<'a>(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
bitmap_type: BitmapType,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Bitmap> {
|
||||
self.range_read(offset, size, metrics)
|
||||
.await
|
||||
.and_then(|bytes| {
|
||||
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
|
||||
})
|
||||
}
|
||||
|
||||
/// Retrieves the multiple bitmaps from the given ranges.
|
||||
async fn bitmap_deque(
|
||||
async fn bitmap_deque<'a>(
|
||||
&mut self,
|
||||
ranges: &[(Range<u64>, BitmapType)],
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<VecDeque<Bitmap>> {
|
||||
let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip();
|
||||
let bytes = self.read_vec(&ranges).await?;
|
||||
let bytes = self.read_vec(&ranges, metrics).await?;
|
||||
bytes
|
||||
.into_iter()
|
||||
.zip(types)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
@@ -23,10 +24,10 @@ use snafu::{ResultExt, ensure};
|
||||
|
||||
use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
|
||||
use crate::inverted_index::format::MIN_BLOB_SIZE;
|
||||
use crate::inverted_index::format::reader::InvertedIndexReader;
|
||||
use crate::inverted_index::format::reader::footer::{
|
||||
DEFAULT_PREFETCH_SIZE, InvertedIndexFooterReader,
|
||||
};
|
||||
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
|
||||
|
||||
/// Inverted index blob reader, implements [`InvertedIndexReader`]
|
||||
pub struct InvertedIndexBlobReader<R> {
|
||||
@@ -53,27 +54,58 @@ impl<R> InvertedIndexBlobReader<R> {
|
||||
|
||||
#[async_trait]
|
||||
impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
|
||||
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
|
||||
async fn range_read<'a>(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Vec<u8>> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
|
||||
let buf = self
|
||||
.source
|
||||
.read(offset..offset + size as u64)
|
||||
.await
|
||||
.context(CommonIoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_bytes += size as u64;
|
||||
m.total_ranges += 1;
|
||||
m.fetch_elapsed += start.unwrap().elapsed();
|
||||
}
|
||||
|
||||
Ok(buf.into())
|
||||
}
|
||||
|
||||
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
self.source.read_vec(ranges).await.context(CommonIoSnafu)
|
||||
async fn read_vec<'a>(
|
||||
&self,
|
||||
ranges: &[Range<u64>],
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Vec<Bytes>> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
|
||||
let result = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
|
||||
m.total_ranges += ranges.len();
|
||||
m.fetch_elapsed += start.unwrap().elapsed();
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
|
||||
async fn metadata<'a>(
|
||||
&self,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Arc<InvertedIndexMetas>> {
|
||||
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
|
||||
let blob_size = metadata.content_length;
|
||||
Self::validate_blob_size(blob_size)?;
|
||||
|
||||
let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
|
||||
.with_prefetch_size(DEFAULT_PREFETCH_SIZE);
|
||||
footer_reader.metadata().await.map(Arc::new)
|
||||
footer_reader.metadata(metrics).await.map(Arc::new)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,7 +205,7 @@ mod tests {
|
||||
let blob = create_inverted_index_blob();
|
||||
let blob_reader = InvertedIndexBlobReader::new(blob);
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let metas = blob_reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metas.metas.len(), 2);
|
||||
|
||||
let meta0 = metas.metas.get("tag0").unwrap();
|
||||
@@ -200,13 +232,14 @@ mod tests {
|
||||
let blob = create_inverted_index_blob();
|
||||
let blob_reader = InvertedIndexBlobReader::new(blob);
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let metas = blob_reader.metadata(None).await.unwrap();
|
||||
let meta = metas.metas.get("tag0").unwrap();
|
||||
|
||||
let fst_map = blob_reader
|
||||
.fst(
|
||||
meta.base_offset + meta.relative_fst_offset as u64,
|
||||
meta.fst_size,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -219,6 +252,7 @@ mod tests {
|
||||
.fst(
|
||||
meta.base_offset + meta.relative_fst_offset as u64,
|
||||
meta.fst_size,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -232,30 +266,30 @@ mod tests {
|
||||
let blob = create_inverted_index_blob();
|
||||
let blob_reader = InvertedIndexBlobReader::new(blob);
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let metas = blob_reader.metadata(None).await.unwrap();
|
||||
let meta = metas.metas.get("tag0").unwrap();
|
||||
|
||||
let bitmap = blob_reader
|
||||
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
|
||||
.bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, mock_bitmap());
|
||||
let bitmap = blob_reader
|
||||
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
|
||||
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, mock_bitmap());
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let metas = blob_reader.metadata(None).await.unwrap();
|
||||
let meta = metas.metas.get("tag1").unwrap();
|
||||
|
||||
let bitmap = blob_reader
|
||||
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
|
||||
.bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, mock_bitmap());
|
||||
let bitmap = blob_reader
|
||||
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
|
||||
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, mock_bitmap());
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
|
||||
use prost::Message;
|
||||
@@ -23,6 +25,7 @@ use crate::inverted_index::error::{
|
||||
UnexpectedZeroSegmentRowCountSnafu,
|
||||
};
|
||||
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
use crate::inverted_index::format::reader::InvertedIndexReadMetrics;
|
||||
|
||||
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
|
||||
|
||||
@@ -54,12 +57,17 @@ impl<R> InvertedIndexFooterReader<R> {
|
||||
}
|
||||
|
||||
impl<R: RangeReader> InvertedIndexFooterReader<R> {
|
||||
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
|
||||
pub async fn metadata(
|
||||
&mut self,
|
||||
mut metrics: Option<&mut InvertedIndexReadMetrics>,
|
||||
) -> Result<InvertedIndexMetas> {
|
||||
ensure!(
|
||||
self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
|
||||
BlobSizeTooSmallSnafu
|
||||
);
|
||||
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
|
||||
let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
|
||||
let suffix = self
|
||||
.source
|
||||
@@ -73,19 +81,36 @@ impl<R: RangeReader> InvertedIndexFooterReader<R> {
|
||||
let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
|
||||
// Did not fetch the entire file metadata in the initial read, need to make a second request.
|
||||
if length > suffix_len as u64 - footer_size {
|
||||
let result = if length > suffix_len as u64 - footer_size {
|
||||
let metadata_start = self.blob_size - length - footer_size;
|
||||
let meta = self
|
||||
.source
|
||||
.read(metadata_start..self.blob_size - footer_size)
|
||||
.await
|
||||
.context(CommonIoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics.as_deref_mut() {
|
||||
m.total_bytes += self.blob_size.min(self.prefetch_size()) + length;
|
||||
m.total_ranges += 2;
|
||||
}
|
||||
|
||||
self.parse_payload(&meta, length)
|
||||
} else {
|
||||
if let Some(m) = metrics.as_deref_mut() {
|
||||
m.total_bytes += self.blob_size.min(self.prefetch_size());
|
||||
m.total_ranges += 1;
|
||||
}
|
||||
|
||||
let metadata_start = self.blob_size - length - footer_size - footer_start;
|
||||
let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
|
||||
self.parse_payload(meta, length)
|
||||
};
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.fetch_elapsed += start.unwrap().elapsed();
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
|
||||
@@ -186,7 +211,7 @@ mod tests {
|
||||
reader = reader.with_prefetch_size(prefetch);
|
||||
}
|
||||
|
||||
let metas = reader.metadata().await.unwrap();
|
||||
let metas = reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metas.metas.len(), 1);
|
||||
let index_meta = &metas.metas.get("test").unwrap();
|
||||
assert_eq!(index_meta.name, "test");
|
||||
@@ -210,7 +235,7 @@ mod tests {
|
||||
reader = reader.with_prefetch_size(prefetch);
|
||||
}
|
||||
|
||||
let result = reader.metadata().await;
|
||||
let result = reader.metadata(None).await;
|
||||
assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
|
||||
}
|
||||
}
|
||||
@@ -233,7 +258,7 @@ mod tests {
|
||||
reader = reader.with_prefetch_size(prefetch);
|
||||
}
|
||||
|
||||
let result = reader.metadata().await;
|
||||
let result = reader.metadata(None).await;
|
||||
assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,7 +122,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let reader = InvertedIndexBlobReader::new(blob);
|
||||
let metadata = reader.metadata().await.unwrap();
|
||||
let metadata = reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metadata.total_row_count, 8);
|
||||
assert_eq!(metadata.segment_row_count, 1);
|
||||
assert_eq!(metadata.metas.len(), 0);
|
||||
@@ -182,7 +182,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let reader = InvertedIndexBlobReader::new(blob);
|
||||
let metadata = reader.metadata().await.unwrap();
|
||||
let metadata = reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metadata.total_row_count, 8);
|
||||
assert_eq!(metadata.segment_row_count, 1);
|
||||
assert_eq!(metadata.metas.len(), 2);
|
||||
@@ -198,13 +198,19 @@ mod tests {
|
||||
.fst(
|
||||
tag0.base_offset + tag0.relative_fst_offset as u64,
|
||||
tag0.fst_size,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(fst0.len(), 3);
|
||||
let [offset, size] = unpack(fst0.get(b"a").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag0.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -213,7 +219,12 @@ mod tests {
|
||||
);
|
||||
let [offset, size] = unpack(fst0.get(b"b").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag0.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -222,7 +233,12 @@ mod tests {
|
||||
);
|
||||
let [offset, size] = unpack(fst0.get(b"c").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag0.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -241,13 +257,19 @@ mod tests {
|
||||
.fst(
|
||||
tag1.base_offset + tag1.relative_fst_offset as u64,
|
||||
tag1.fst_size,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(fst1.len(), 3);
|
||||
let [offset, size] = unpack(fst1.get(b"x").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag1.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -256,7 +278,12 @@ mod tests {
|
||||
);
|
||||
let [offset, size] = unpack(fst1.get(b"y").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag1.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -265,7 +292,12 @@ mod tests {
|
||||
);
|
||||
let [offset, size] = unpack(fst1.get(b"z").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag1.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
|
||||
@@ -16,7 +16,7 @@ use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::Result;
|
||||
use crate::inverted_index::format::reader::InvertedIndexReader;
|
||||
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
|
||||
|
||||
/// `ParallelFstValuesMapper` enables parallel mapping of multiple FST value groups to their
|
||||
/// corresponding bitmaps within an inverted index.
|
||||
@@ -35,7 +35,8 @@ impl<'a> ParallelFstValuesMapper<'a> {
|
||||
|
||||
pub async fn map_values_vec(
|
||||
&mut self,
|
||||
value_and_meta_vec: &[(Vec<u64>, &'a InvertedIndexMeta)],
|
||||
value_and_meta_vec: &[(Vec<u64>, &InvertedIndexMeta)],
|
||||
metrics: Option<&mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Vec<Bitmap>> {
|
||||
let groups = value_and_meta_vec
|
||||
.iter()
|
||||
@@ -64,7 +65,7 @@ impl<'a> ParallelFstValuesMapper<'a> {
|
||||
}
|
||||
|
||||
common_telemetry::debug!("fetch ranges: {:?}", fetch_ranges);
|
||||
let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges).await?;
|
||||
let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges, metrics).await?;
|
||||
let mut output = Vec::with_capacity(groups.len());
|
||||
|
||||
for counter in groups {
|
||||
@@ -95,23 +96,25 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_map_values_vec() {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader.expect_bitmap_deque().returning(|ranges| {
|
||||
let mut output = VecDeque::new();
|
||||
for (range, bitmap_type) in ranges {
|
||||
let offset = range.start;
|
||||
let size = range.end - range.start;
|
||||
match (offset, size, bitmap_type) {
|
||||
(1, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
|
||||
mock_reader
|
||||
.expect_bitmap_deque()
|
||||
.returning(|ranges, _metrics| {
|
||||
let mut output = VecDeque::new();
|
||||
for (range, bitmap_type) in ranges {
|
||||
let offset = range.start;
|
||||
let size = range.end - range.start;
|
||||
match (offset, size, bitmap_type) {
|
||||
(1, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
|
||||
}
|
||||
(2, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type))
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
(2, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type))
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(output)
|
||||
});
|
||||
Ok(output)
|
||||
});
|
||||
|
||||
let meta = InvertedIndexMeta {
|
||||
bitmap_type: BitmapType::Roaring.into(),
|
||||
@@ -120,13 +123,13 @@ mod tests {
|
||||
let mut values_mapper = ParallelFstValuesMapper::new(&mut mock_reader);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![], &meta)])
|
||||
.map_values_vec(&[(vec![], &meta)], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result[0].count_ones(), 0);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![value(1, 1)], &meta)])
|
||||
.map_values_vec(&[(vec![value(1, 1)], &meta)], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -135,7 +138,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![value(2, 1)], &meta)])
|
||||
.map_values_vec(&[(vec![value(2, 1)], &meta)], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -144,7 +147,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)])
|
||||
.map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -153,7 +156,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)])
|
||||
.map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -162,7 +165,10 @@ mod tests {
|
||||
);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)])
|
||||
.map_values_vec(
|
||||
&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -174,10 +180,13 @@ mod tests {
|
||||
Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
|
||||
);
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[
|
||||
(vec![value(2, 1), value(1, 1)], &meta),
|
||||
(vec![value(1, 1)], &meta),
|
||||
])
|
||||
.map_values_vec(
|
||||
&[
|
||||
(vec![value(2, 1), value(1, 1)], &meta),
|
||||
(vec![value(1, 1)], &meta),
|
||||
],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
|
||||
@@ -19,7 +19,7 @@ pub use predicates_apply::PredicatesIndexApplier;
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::Result;
|
||||
use crate::inverted_index::format::reader::InvertedIndexReader;
|
||||
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
|
||||
|
||||
/// The output of an apply operation.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
@@ -44,10 +44,11 @@ pub trait IndexApplier: Send + Sync {
|
||||
/// Applies the predefined predicates to the data read by the given index reader, returning
|
||||
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
|
||||
#[allow(unused_parens)]
|
||||
async fn apply<'a>(
|
||||
async fn apply<'a, 'b>(
|
||||
&self,
|
||||
context: SearchContext,
|
||||
reader: &mut (dyn InvertedIndexReader + 'a),
|
||||
metrics: Option<&'b mut InvertedIndexReadMetrics>,
|
||||
) -> Result<ApplyOutput>;
|
||||
|
||||
/// Returns the memory usage of the applier.
|
||||
|
||||
@@ -19,7 +19,7 @@ use greptime_proto::v1::index::InvertedIndexMetas;
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
|
||||
use crate::inverted_index::format::reader::InvertedIndexReader;
|
||||
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
|
||||
use crate::inverted_index::search::fst_apply::{
|
||||
FstApplier, IntersectionFstApplier, KeysFstApplier,
|
||||
};
|
||||
@@ -43,12 +43,14 @@ pub struct PredicatesIndexApplier {
|
||||
impl IndexApplier for PredicatesIndexApplier {
|
||||
/// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
|
||||
/// bitmaps obtained for each index to result in a final set of indices.
|
||||
async fn apply<'a>(
|
||||
async fn apply<'a, 'b>(
|
||||
&self,
|
||||
context: SearchContext,
|
||||
reader: &mut (dyn InvertedIndexReader + 'a),
|
||||
metrics: Option<&'b mut InvertedIndexReadMetrics>,
|
||||
) -> Result<ApplyOutput> {
|
||||
let metadata = reader.metadata().await?;
|
||||
let mut metrics = metrics;
|
||||
let metadata = reader.metadata(metrics.as_deref_mut()).await?;
|
||||
let mut output = ApplyOutput {
|
||||
matched_segment_ids: Bitmap::new_bitvec(),
|
||||
total_row_count: metadata.total_row_count as _,
|
||||
@@ -84,7 +86,7 @@ impl IndexApplier for PredicatesIndexApplier {
|
||||
return Ok(output);
|
||||
}
|
||||
|
||||
let fsts = reader.fst_vec(&fst_ranges).await?;
|
||||
let fsts = reader.fst_vec(&fst_ranges, metrics.as_deref_mut()).await?;
|
||||
let value_and_meta_vec = fsts
|
||||
.into_iter()
|
||||
.zip(appliers)
|
||||
@@ -92,7 +94,7 @@ impl IndexApplier for PredicatesIndexApplier {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut mapper = ParallelFstValuesMapper::new(reader);
|
||||
let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?;
|
||||
let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec, metrics).await?;
|
||||
|
||||
let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty
|
||||
for bm in bm_vec {
|
||||
@@ -221,26 +223,28 @@ mod tests {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas([("tag-0", 0)])));
|
||||
mock_reader.expect_fst_vec().returning(|_ranges| {
|
||||
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
|
||||
mock_reader.expect_fst_vec().returning(|_ranges, _metrics| {
|
||||
Ok(vec![
|
||||
FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(),
|
||||
])
|
||||
});
|
||||
|
||||
mock_reader.expect_bitmap_deque().returning(|arg| {
|
||||
assert_eq!(arg.len(), 1);
|
||||
let range = &arg[0].0;
|
||||
let bitmap_type = arg[0].1;
|
||||
assert_eq!(*range, 2..3);
|
||||
assert_eq!(bitmap_type, BitmapType::Roaring);
|
||||
Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
|
||||
&[0b10101010],
|
||||
bitmap_type,
|
||||
)]))
|
||||
});
|
||||
mock_reader
|
||||
.expect_bitmap_deque()
|
||||
.returning(|arg, _metrics| {
|
||||
assert_eq!(arg.len(), 1);
|
||||
let range = &arg[0].0;
|
||||
let bitmap_type = arg[0].1;
|
||||
assert_eq!(*range, 2..3);
|
||||
assert_eq!(bitmap_type, BitmapType::Roaring);
|
||||
Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
|
||||
&[0b10101010],
|
||||
bitmap_type,
|
||||
)]))
|
||||
});
|
||||
let output = applier
|
||||
.apply(SearchContext::default(), &mut mock_reader)
|
||||
.apply(SearchContext::default(), &mut mock_reader, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -252,14 +256,14 @@ mod tests {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas([("tag-0", 0)])));
|
||||
mock_reader.expect_fst_vec().returning(|_range| {
|
||||
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
|
||||
mock_reader.expect_fst_vec().returning(|_range, _metrics| {
|
||||
Ok(vec![
|
||||
FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(),
|
||||
])
|
||||
});
|
||||
let output = applier
|
||||
.apply(SearchContext::default(), &mut mock_reader)
|
||||
.apply(SearchContext::default(), &mut mock_reader, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(output.matched_segment_ids.count_ones(), 0);
|
||||
@@ -279,8 +283,8 @@ mod tests {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
|
||||
mock_reader.expect_fst_vec().returning(|ranges| {
|
||||
.returning(|_| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
|
||||
mock_reader.expect_fst_vec().returning(|ranges, _metrics| {
|
||||
let mut output = vec![];
|
||||
for range in ranges {
|
||||
match range.start {
|
||||
@@ -293,27 +297,29 @@ mod tests {
|
||||
}
|
||||
Ok(output)
|
||||
});
|
||||
mock_reader.expect_bitmap_deque().returning(|ranges| {
|
||||
let mut output = VecDeque::new();
|
||||
for (range, bitmap_type) in ranges {
|
||||
let offset = range.start;
|
||||
let size = range.end - range.start;
|
||||
match (offset, size, bitmap_type) {
|
||||
(1, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
|
||||
mock_reader
|
||||
.expect_bitmap_deque()
|
||||
.returning(|ranges, _metrics| {
|
||||
let mut output = VecDeque::new();
|
||||
for (range, bitmap_type) in ranges {
|
||||
let offset = range.start;
|
||||
let size = range.end - range.start;
|
||||
match (offset, size, bitmap_type) {
|
||||
(1, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
|
||||
}
|
||||
(2, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
(2, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(output)
|
||||
});
|
||||
Ok(output)
|
||||
});
|
||||
|
||||
let output = applier
|
||||
.apply(SearchContext::default(), &mut mock_reader)
|
||||
.apply(SearchContext::default(), &mut mock_reader, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -331,10 +337,10 @@ mod tests {
|
||||
let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas([("tag-0", 0)])));
|
||||
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
|
||||
|
||||
let output = applier
|
||||
.apply(SearchContext::default(), &mut mock_reader)
|
||||
.apply(SearchContext::default(), &mut mock_reader, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan
|
||||
@@ -343,7 +349,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_index_applier_with_empty_index() {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader.expect_metadata().returning(move || {
|
||||
mock_reader.expect_metadata().returning(move |_| {
|
||||
Ok(Arc::new(InvertedIndexMetas {
|
||||
total_row_count: 0, // No rows
|
||||
segment_row_count: 1,
|
||||
@@ -359,7 +365,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let output = applier
|
||||
.apply(SearchContext::default(), &mut mock_reader)
|
||||
.apply(SearchContext::default(), &mut mock_reader, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(output.matched_segment_ids.is_empty());
|
||||
@@ -370,7 +376,7 @@ mod tests {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas(vec![])));
|
||||
.returning(|_| Ok(mock_metas(vec![])));
|
||||
|
||||
let mut mock_fst_applier = MockFstApplier::new();
|
||||
mock_fst_applier.expect_apply().never();
|
||||
@@ -385,6 +391,7 @@ mod tests {
|
||||
index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
|
||||
},
|
||||
&mut mock_reader,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(result, Err(Error::IndexNotFound { .. })));
|
||||
@@ -395,6 +402,7 @@ mod tests {
|
||||
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
|
||||
},
|
||||
&mut mock_reader,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -406,6 +414,7 @@ mod tests {
|
||||
index_not_found_strategy: IndexNotFoundStrategy::Ignore,
|
||||
},
|
||||
&mut mock_reader,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -44,6 +44,7 @@ use crate::cache::write_cache::WriteCacheRef;
|
||||
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
|
||||
use crate::read::Batch;
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::parquet::reader::MetadataCacheMetrics;
|
||||
|
||||
/// Metrics type key for sst meta.
|
||||
const SST_META_TYPE: &str = "sst_meta";
|
||||
@@ -90,6 +91,32 @@ impl CacheStrategy {
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets parquet metadata with cache metrics tracking.
|
||||
/// Returns the metadata and updates the provided metrics.
|
||||
pub(crate) async fn get_parquet_meta_data_with_metrics(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
metrics: &mut MetadataCacheMetrics,
|
||||
) -> Option<Arc<ParquetMetaData>> {
|
||||
match self {
|
||||
CacheStrategy::EnableAll(cache_manager) => {
|
||||
cache_manager
|
||||
.get_parquet_meta_data_with_metrics(file_id, metrics)
|
||||
.await
|
||||
}
|
||||
CacheStrategy::Compaction(cache_manager) => {
|
||||
cache_manager
|
||||
.get_parquet_meta_data_with_metrics(file_id, metrics)
|
||||
.await
|
||||
}
|
||||
CacheStrategy::Disabled => {
|
||||
metrics.mem_cache_miss += 1;
|
||||
metrics.file_cache_miss += 1;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
|
||||
pub fn get_parquet_meta_data_from_mem_cache(
|
||||
&self,
|
||||
@@ -317,6 +344,36 @@ impl CacheManager {
|
||||
None
|
||||
}
|
||||
|
||||
/// Gets cached [ParquetMetaData] with metrics tracking.
|
||||
/// Tries in-memory cache first, then file cache, updating metrics accordingly.
|
||||
pub(crate) async fn get_parquet_meta_data_with_metrics(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
metrics: &mut MetadataCacheMetrics,
|
||||
) -> Option<Arc<ParquetMetaData>> {
|
||||
// Try to get metadata from sst meta cache
|
||||
if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache_inner(file_id) {
|
||||
metrics.mem_cache_hit += 1;
|
||||
return Some(metadata);
|
||||
}
|
||||
metrics.mem_cache_miss += 1;
|
||||
|
||||
// Try to get metadata from write cache
|
||||
let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
|
||||
if let Some(write_cache) = &self.write_cache
|
||||
&& let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
|
||||
{
|
||||
metrics.file_cache_hit += 1;
|
||||
let metadata = Arc::new(metadata);
|
||||
// Put metadata into sst meta cache
|
||||
self.put_parquet_meta_data(file_id, metadata.clone());
|
||||
return Some(metadata);
|
||||
};
|
||||
metrics.file_cache_miss += 1;
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Gets cached [ParquetMetaData] from in-memory cache.
|
||||
/// This method does not perform I/O.
|
||||
pub fn get_parquet_meta_data_from_mem_cache(
|
||||
@@ -330,6 +387,17 @@ impl CacheManager {
|
||||
})
|
||||
}
|
||||
|
||||
/// Gets cached [ParquetMetaData] from in-memory cache without updating global metrics.
|
||||
/// This is used by `get_parquet_meta_data_with_metrics` to avoid double counting.
|
||||
fn get_parquet_meta_data_from_mem_cache_inner(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
) -> Option<Arc<ParquetMetaData>> {
|
||||
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
|
||||
sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()))
|
||||
})
|
||||
}
|
||||
|
||||
/// Puts [ParquetMetaData] into the cache.
|
||||
pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
|
||||
if let Some(cache) = &self.sst_meta_cache {
|
||||
|
||||
42
src/mito2/src/cache/index.rs
vendored
42
src/mito2/src/cache/index.rs
vendored
@@ -31,6 +31,29 @@ const INDEX_METADATA_TYPE: &str = "index_metadata";
|
||||
/// Metrics for index content.
|
||||
const INDEX_CONTENT_TYPE: &str = "index_content";
|
||||
|
||||
/// Metrics collected from IndexCache operations.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct IndexCacheMetrics {
|
||||
/// Number of cache hits.
|
||||
pub cache_hit: usize,
|
||||
/// Number of cache misses.
|
||||
pub cache_miss: usize,
|
||||
/// Number of pages accessed.
|
||||
pub num_pages: usize,
|
||||
/// Total bytes from pages.
|
||||
pub page_bytes: u64,
|
||||
}
|
||||
|
||||
impl IndexCacheMetrics {
|
||||
/// Merges another set of metrics into this one.
|
||||
pub fn merge(&mut self, other: &Self) {
|
||||
self.cache_hit += other.cache_hit;
|
||||
self.cache_miss += other.cache_miss;
|
||||
self.num_pages += other.num_pages;
|
||||
self.page_bytes += other.page_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct PageKey {
|
||||
page_id: u64,
|
||||
@@ -160,18 +183,20 @@ where
|
||||
offset: u64,
|
||||
size: u32,
|
||||
load: F,
|
||||
) -> Result<Vec<u8>, E>
|
||||
) -> Result<(Vec<u8>, IndexCacheMetrics), E>
|
||||
where
|
||||
F: Fn(Vec<Range<u64>>) -> Fut,
|
||||
Fut: Future<Output = Result<Vec<Bytes>, E>>,
|
||||
E: std::error::Error,
|
||||
{
|
||||
let mut metrics = IndexCacheMetrics::default();
|
||||
let page_keys =
|
||||
PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
|
||||
// Size is 0, return empty data.
|
||||
if page_keys.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
return Ok((Vec::new(), metrics));
|
||||
}
|
||||
metrics.num_pages = page_keys.len();
|
||||
let mut data = Vec::with_capacity(page_keys.len());
|
||||
data.resize(page_keys.len(), Bytes::new());
|
||||
let mut cache_miss_range = vec![];
|
||||
@@ -182,10 +207,13 @@ where
|
||||
match self.get_page(key, *page_key) {
|
||||
Some(page) => {
|
||||
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
|
||||
metrics.cache_hit += 1;
|
||||
metrics.page_bytes += page.len() as u64;
|
||||
data[i] = page;
|
||||
}
|
||||
None => {
|
||||
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
|
||||
metrics.cache_miss += 1;
|
||||
let base_offset = page_key.page_id * self.page_size;
|
||||
let pruned_size = if i == last_index {
|
||||
prune_size(page_keys.iter(), file_size, self.page_size)
|
||||
@@ -201,14 +229,18 @@ where
|
||||
let pages = load(cache_miss_range).await?;
|
||||
for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
|
||||
let page_key = page_keys[i];
|
||||
metrics.page_bytes += page.len() as u64;
|
||||
data[i] = page.clone();
|
||||
self.put_page(key, page_key, page.clone());
|
||||
}
|
||||
}
|
||||
let buffer = Buffer::from_iter(data.into_iter());
|
||||
Ok(buffer
|
||||
.slice(PageKey::calculate_range(offset, size, self.page_size))
|
||||
.to_vec())
|
||||
Ok((
|
||||
buffer
|
||||
.slice(PageKey::calculate_range(offset, size, self.page_size))
|
||||
.to_vec(),
|
||||
metrics,
|
||||
))
|
||||
}
|
||||
|
||||
fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {
|
||||
|
||||
65
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
65
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
@@ -14,12 +14,13 @@
|
||||
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use index::bloom_filter::error::Result;
|
||||
use index::bloom_filter::reader::BloomFilterReader;
|
||||
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
|
||||
use store_api::storage::{ColumnId, FileId};
|
||||
|
||||
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
|
||||
@@ -114,51 +115,93 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
|
||||
|
||||
#[async_trait]
|
||||
impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
|
||||
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
|
||||
async fn range_read(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Bytes> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
let inner = &self.inner;
|
||||
self.cache
|
||||
let (result, cache_metrics) = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
self.blob_size,
|
||||
offset,
|
||||
size,
|
||||
move |ranges| async move { inner.read_vec(&ranges).await },
|
||||
move |ranges| async move { inner.read_vec(&ranges, None).await },
|
||||
)
|
||||
.await
|
||||
.map(|b| b.into())
|
||||
.await?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += cache_metrics.num_pages;
|
||||
m.total_bytes += cache_metrics.page_bytes;
|
||||
m.cache_hit += cache_metrics.cache_hit;
|
||||
m.cache_miss += cache_metrics.cache_miss;
|
||||
if let Some(start) = start {
|
||||
m.fetch_elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result.into())
|
||||
}
|
||||
|
||||
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
async fn read_vec(
|
||||
&self,
|
||||
ranges: &[Range<u64>],
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Vec<Bytes>> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
|
||||
let mut pages = Vec::with_capacity(ranges.len());
|
||||
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
|
||||
for range in ranges {
|
||||
let inner = &self.inner;
|
||||
let page = self
|
||||
let (page, cache_metrics) = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
self.blob_size,
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
move |ranges| async move { inner.read_vec(&ranges).await },
|
||||
move |ranges| async move { inner.read_vec(&ranges, None).await },
|
||||
)
|
||||
.await?;
|
||||
|
||||
total_cache_metrics.merge(&cache_metrics);
|
||||
pages.push(Bytes::from(page));
|
||||
}
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += total_cache_metrics.num_pages;
|
||||
m.total_bytes += total_cache_metrics.page_bytes;
|
||||
m.cache_hit += total_cache_metrics.cache_hit;
|
||||
m.cache_miss += total_cache_metrics.cache_miss;
|
||||
if let Some(start) = start {
|
||||
m.fetch_elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(pages)
|
||||
}
|
||||
|
||||
/// Reads the meta information of the bloom filter.
|
||||
async fn metadata(&self) -> Result<BloomFilterMeta> {
|
||||
async fn metadata(
|
||||
&self,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilterMeta> {
|
||||
if let Some(cached) = self
|
||||
.cache
|
||||
.get_metadata((self.file_id, self.column_id, self.tag))
|
||||
{
|
||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
if let Some(m) = metrics {
|
||||
m.cache_hit += 1;
|
||||
}
|
||||
Ok((*cached).clone())
|
||||
} else {
|
||||
let meta = self.inner.metadata().await?;
|
||||
let meta = self.inner.metadata(metrics).await?;
|
||||
self.cache.put_metadata(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
Arc::new(meta.clone()),
|
||||
|
||||
113
src/mito2/src/cache/index/inverted_index.rs
vendored
113
src/mito2/src/cache/index/inverted_index.rs
vendored
@@ -14,12 +14,13 @@
|
||||
|
||||
use core::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use api::v1::index::InvertedIndexMetas;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use index::inverted_index::error::Result;
|
||||
use index::inverted_index::format::reader::InvertedIndexReader;
|
||||
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
|
||||
use prost::Message;
|
||||
use store_api::storage::FileId;
|
||||
|
||||
@@ -83,46 +84,86 @@ impl<R> CachedInvertedIndexBlobReader<R> {
|
||||
|
||||
#[async_trait]
|
||||
impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
|
||||
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
|
||||
async fn range_read<'a>(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Vec<u8>> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
|
||||
let inner = &self.inner;
|
||||
self.cache
|
||||
let (result, cache_metrics) = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
self.file_id,
|
||||
self.blob_size,
|
||||
offset,
|
||||
size,
|
||||
move |ranges| async move { inner.read_vec(&ranges).await },
|
||||
move |ranges| async move { inner.read_vec(&ranges, None).await },
|
||||
)
|
||||
.await
|
||||
.await?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_bytes += cache_metrics.page_bytes;
|
||||
m.total_ranges += cache_metrics.num_pages;
|
||||
m.cache_hit += cache_metrics.cache_hit;
|
||||
m.cache_miss += cache_metrics.cache_miss;
|
||||
m.fetch_elapsed += start.unwrap().elapsed();
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
async fn read_vec<'a>(
|
||||
&self,
|
||||
ranges: &[Range<u64>],
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Vec<Bytes>> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
|
||||
let mut pages = Vec::with_capacity(ranges.len());
|
||||
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
|
||||
for range in ranges {
|
||||
let inner = &self.inner;
|
||||
let page = self
|
||||
let (page, cache_metrics) = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
self.file_id,
|
||||
self.blob_size,
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
move |ranges| async move { inner.read_vec(&ranges).await },
|
||||
move |ranges| async move { inner.read_vec(&ranges, None).await },
|
||||
)
|
||||
.await?;
|
||||
|
||||
total_cache_metrics.merge(&cache_metrics);
|
||||
pages.push(Bytes::from(page));
|
||||
}
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_bytes += total_cache_metrics.page_bytes;
|
||||
m.total_ranges += total_cache_metrics.num_pages;
|
||||
m.cache_hit += total_cache_metrics.cache_hit;
|
||||
m.cache_miss += total_cache_metrics.cache_miss;
|
||||
m.fetch_elapsed += start.unwrap().elapsed();
|
||||
}
|
||||
|
||||
Ok(pages)
|
||||
}
|
||||
|
||||
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
|
||||
async fn metadata<'a>(
|
||||
&self,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Arc<InvertedIndexMetas>> {
|
||||
if let Some(cached) = self.cache.get_metadata(self.file_id) {
|
||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
if let Some(m) = metrics {
|
||||
m.cache_hit += 1;
|
||||
}
|
||||
Ok(cached)
|
||||
} else {
|
||||
let meta = self.inner.metadata().await?;
|
||||
let meta = self.inner.metadata(metrics).await?;
|
||||
self.cache.put_metadata(self.file_id, meta.clone());
|
||||
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok(meta)
|
||||
@@ -277,7 +318,7 @@ mod test {
|
||||
reader,
|
||||
Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
|
||||
);
|
||||
let metadata = cached_reader.metadata().await.unwrap();
|
||||
let metadata = cached_reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metadata.total_row_count, 8);
|
||||
assert_eq!(metadata.segment_row_count, 1);
|
||||
assert_eq!(metadata.metas.len(), 2);
|
||||
@@ -292,13 +333,19 @@ mod test {
|
||||
.fst(
|
||||
tag0.base_offset + tag0.relative_fst_offset as u64,
|
||||
tag0.fst_size,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(fst0.len(), 3);
|
||||
let [offset, size] = unpack(fst0.get(b"a").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag0.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -307,7 +354,12 @@ mod test {
|
||||
);
|
||||
let [offset, size] = unpack(fst0.get(b"b").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag0.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -316,7 +368,12 @@ mod test {
|
||||
);
|
||||
let [offset, size] = unpack(fst0.get(b"c").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag0.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -335,13 +392,19 @@ mod test {
|
||||
.fst(
|
||||
tag1.base_offset + tag1.relative_fst_offset as u64,
|
||||
tag1.fst_size,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(fst1.len(), 3);
|
||||
let [offset, size] = unpack(fst1.get(b"x").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag1.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -350,7 +413,12 @@ mod test {
|
||||
);
|
||||
let [offset, size] = unpack(fst1.get(b"y").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag1.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -359,7 +427,12 @@ mod test {
|
||||
);
|
||||
let [offset, size] = unpack(fst1.get(b"z").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.bitmap(
|
||||
tag1.base_offset + offset as u64,
|
||||
size,
|
||||
BitmapType::Roaring,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
@@ -372,16 +445,16 @@ mod test {
|
||||
for _ in 0..FUZZ_REPEAT_TIMES {
|
||||
let offset = rng.random_range(0..file_size);
|
||||
let size = rng.random_range(0..file_size as u32 - offset as u32);
|
||||
let expected = cached_reader.range_read(offset, size).await.unwrap();
|
||||
let expected = cached_reader.range_read(offset, size, None).await.unwrap();
|
||||
let inner = &cached_reader.inner;
|
||||
let read = cached_reader
|
||||
let (read, _cache_metrics) = cached_reader
|
||||
.cache
|
||||
.get_or_load(
|
||||
cached_reader.file_id,
|
||||
file_size,
|
||||
offset,
|
||||
size,
|
||||
|ranges| async move { inner.read_vec(&ranges).await },
|
||||
|ranges| async move { inner.read_vec(&ranges, None).await },
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -233,7 +233,7 @@ async fn collect_inverted_entries(
|
||||
InvertedIndexBlobReader::new(blob_reader),
|
||||
cache.clone(),
|
||||
);
|
||||
match reader.metadata().await {
|
||||
match reader.metadata(None).await {
|
||||
Ok(metas) => metas,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
@@ -247,7 +247,7 @@ async fn collect_inverted_entries(
|
||||
}
|
||||
} else {
|
||||
let reader = InvertedIndexBlobReader::new(blob_reader);
|
||||
match reader.metadata().await {
|
||||
match reader.metadata(None).await {
|
||||
Ok(metas) => metas,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
@@ -318,10 +318,10 @@ async fn try_read_bloom_meta(
|
||||
bloom_reader,
|
||||
cache.clone(),
|
||||
)
|
||||
.metadata()
|
||||
.metadata(None)
|
||||
.await
|
||||
}
|
||||
_ => bloom_reader.metadata().await,
|
||||
_ => bloom_reader.metadata(None).await,
|
||||
};
|
||||
|
||||
match result {
|
||||
|
||||
@@ -41,10 +41,14 @@ use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
|
||||
use crate::read::scan_region::StreamContext;
|
||||
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
|
||||
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
|
||||
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
|
||||
use crate::sst::parquet::file_range::FileRange;
|
||||
use crate::sst::parquet::flat_format::time_index_column_index;
|
||||
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
|
||||
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
|
||||
use crate::sst::parquet::row_group::ParquetFetchMetrics;
|
||||
|
||||
/// Verbose scan metrics for a partition.
|
||||
#[derive(Default)]
|
||||
@@ -81,6 +85,8 @@ pub(crate) struct ScanMetricsSet {
|
||||
// SST related metrics:
|
||||
/// Duration to build file ranges.
|
||||
build_parts_cost: Duration,
|
||||
/// Duration to scan SST files.
|
||||
file_scan_cost: Duration,
|
||||
/// Number of row groups before filtering.
|
||||
rg_total: usize,
|
||||
/// Number of row groups filtered by fulltext index.
|
||||
@@ -126,6 +132,18 @@ pub(crate) struct ScanMetricsSet {
|
||||
|
||||
/// The stream reached EOF
|
||||
stream_eof: bool,
|
||||
|
||||
// Optional verbose metrics:
|
||||
/// Inverted index apply metrics.
|
||||
inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
|
||||
/// Bloom filter index apply metrics.
|
||||
bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
|
||||
/// Fulltext index apply metrics.
|
||||
fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
|
||||
/// Parquet fetch metrics.
|
||||
fetch_metrics: Option<ParquetFetchMetrics>,
|
||||
/// Metadata cache metrics.
|
||||
metadata_cache_metrics: Option<MetadataCacheMetrics>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for ScanMetricsSet {
|
||||
@@ -141,6 +159,7 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
num_mem_ranges,
|
||||
num_file_ranges,
|
||||
build_parts_cost,
|
||||
file_scan_cost,
|
||||
rg_total,
|
||||
rg_fulltext_filtered,
|
||||
rg_inverted_filtered,
|
||||
@@ -166,6 +185,11 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
mem_rows,
|
||||
mem_batches,
|
||||
mem_series,
|
||||
inverted_index_apply_metrics,
|
||||
bloom_filter_apply_metrics,
|
||||
fulltext_index_apply_metrics,
|
||||
fetch_metrics,
|
||||
metadata_cache_metrics,
|
||||
} = self;
|
||||
|
||||
// Write core metrics
|
||||
@@ -181,6 +205,7 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
\"num_mem_ranges\":{num_mem_ranges}, \
|
||||
\"num_file_ranges\":{num_file_ranges}, \
|
||||
\"build_parts_cost\":\"{build_parts_cost:?}\", \
|
||||
\"file_scan_cost\":\"{file_scan_cost:?}\", \
|
||||
\"rg_total\":{rg_total}, \
|
||||
\"rows_before_filter\":{rows_before_filter}, \
|
||||
\"num_sst_record_batches\":{num_sst_record_batches}, \
|
||||
@@ -255,6 +280,23 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
|
||||
}
|
||||
|
||||
// Write optional verbose metrics
|
||||
if let Some(metrics) = inverted_index_apply_metrics {
|
||||
write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
|
||||
}
|
||||
if let Some(metrics) = bloom_filter_apply_metrics {
|
||||
write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
|
||||
}
|
||||
if let Some(metrics) = fulltext_index_apply_metrics {
|
||||
write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
|
||||
}
|
||||
if let Some(metrics) = fetch_metrics {
|
||||
write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
|
||||
}
|
||||
if let Some(metrics) = metadata_cache_metrics {
|
||||
write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
|
||||
}
|
||||
|
||||
write!(f, ", \"stream_eof\":{stream_eof}}}")
|
||||
}
|
||||
}
|
||||
@@ -304,14 +346,20 @@ impl ScanMetricsSet {
|
||||
rows_inverted_filtered,
|
||||
rows_bloom_filtered,
|
||||
rows_precise_filtered,
|
||||
inverted_index_apply_metrics,
|
||||
bloom_filter_apply_metrics,
|
||||
fulltext_index_apply_metrics,
|
||||
},
|
||||
num_record_batches,
|
||||
num_batches,
|
||||
num_rows,
|
||||
scan_cost: _,
|
||||
scan_cost,
|
||||
metadata_cache_metrics,
|
||||
fetch_metrics,
|
||||
} = other;
|
||||
|
||||
self.build_parts_cost += *build_cost;
|
||||
self.file_scan_cost += *scan_cost;
|
||||
|
||||
self.rg_total += *rg_total;
|
||||
self.rg_fulltext_filtered += *rg_fulltext_filtered;
|
||||
@@ -328,6 +376,31 @@ impl ScanMetricsSet {
|
||||
self.num_sst_record_batches += *num_record_batches;
|
||||
self.num_sst_batches += *num_batches;
|
||||
self.num_sst_rows += *num_rows;
|
||||
|
||||
// Merge optional verbose metrics
|
||||
if let Some(metrics) = inverted_index_apply_metrics {
|
||||
self.inverted_index_apply_metrics
|
||||
.get_or_insert_with(InvertedIndexApplyMetrics::default)
|
||||
.merge_from(metrics);
|
||||
}
|
||||
if let Some(metrics) = bloom_filter_apply_metrics {
|
||||
self.bloom_filter_apply_metrics
|
||||
.get_or_insert_with(BloomFilterIndexApplyMetrics::default)
|
||||
.merge_from(metrics);
|
||||
}
|
||||
if let Some(metrics) = fulltext_index_apply_metrics {
|
||||
self.fulltext_index_apply_metrics
|
||||
.get_or_insert_with(FulltextIndexApplyMetrics::default)
|
||||
.merge_from(metrics);
|
||||
}
|
||||
if let Some(metrics) = fetch_metrics {
|
||||
self.fetch_metrics
|
||||
.get_or_insert_with(ParquetFetchMetrics::default)
|
||||
.merge_from(metrics);
|
||||
}
|
||||
self.metadata_cache_metrics
|
||||
.get_or_insert_with(MetadataCacheMetrics::default)
|
||||
.merge_from(metadata_cache_metrics);
|
||||
}
|
||||
|
||||
/// Sets distributor metrics.
|
||||
@@ -615,6 +688,11 @@ impl PartitionMetrics {
|
||||
let mut metrics_set = self.0.metrics.lock().unwrap();
|
||||
metrics_set.set_distributor_metrics(metrics);
|
||||
}
|
||||
|
||||
/// Returns whether verbose explain is enabled.
|
||||
pub(crate) fn explain_verbose(&self) -> bool {
|
||||
self.0.explain_verbose
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for PartitionMetrics {
|
||||
@@ -768,6 +846,21 @@ fn can_split_series(num_rows: u64, num_series: u64) -> bool {
|
||||
num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
|
||||
}
|
||||
|
||||
/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
|
||||
/// based on the `explain_verbose` flag.
|
||||
fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
|
||||
if explain_verbose {
|
||||
ReaderFilterMetrics {
|
||||
inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
|
||||
bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
|
||||
fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
|
||||
..Default::default()
|
||||
}
|
||||
} else {
|
||||
ReaderFilterMetrics::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Scans file ranges at `index`.
|
||||
pub(crate) async fn scan_file_ranges(
|
||||
stream_ctx: Arc<StreamContext>,
|
||||
@@ -776,7 +869,10 @@ pub(crate) async fn scan_file_ranges(
|
||||
read_type: &'static str,
|
||||
range_builder: Arc<RangeBuilderList>,
|
||||
) -> Result<impl Stream<Item = Result<Batch>>> {
|
||||
let mut reader_metrics = ReaderMetrics::default();
|
||||
let mut reader_metrics = ReaderMetrics {
|
||||
filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
|
||||
..Default::default()
|
||||
};
|
||||
let ranges = range_builder
|
||||
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
|
||||
.await?;
|
||||
@@ -799,7 +895,10 @@ pub(crate) async fn scan_flat_file_ranges(
|
||||
read_type: &'static str,
|
||||
range_builder: Arc<RangeBuilderList>,
|
||||
) -> Result<impl Stream<Item = Result<RecordBatch>>> {
|
||||
let mut reader_metrics = ReaderMetrics::default();
|
||||
let mut reader_metrics = ReaderMetrics {
|
||||
filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
|
||||
..Default::default()
|
||||
};
|
||||
let ranges = range_builder
|
||||
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
|
||||
.await?;
|
||||
@@ -822,10 +921,18 @@ pub fn build_file_range_scan_stream(
|
||||
ranges: SmallVec<[FileRange; 2]>,
|
||||
) -> impl Stream<Item = Result<Batch>> {
|
||||
try_stream! {
|
||||
let reader_metrics = &mut ReaderMetrics::default();
|
||||
let fetch_metrics = if part_metrics.explain_verbose() {
|
||||
Some(Arc::new(ParquetFetchMetrics::default()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let reader_metrics = &mut ReaderMetrics {
|
||||
fetch_metrics: fetch_metrics.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
for range in ranges {
|
||||
let build_reader_start = Instant::now();
|
||||
let reader = range.reader(stream_ctx.input.series_row_selector).await?;
|
||||
let reader = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await?;
|
||||
let build_cost = build_reader_start.elapsed();
|
||||
part_metrics.inc_build_reader_cost(build_cost);
|
||||
let compat_batch = range.compat_batch();
|
||||
@@ -857,10 +964,18 @@ pub fn build_flat_file_range_scan_stream(
|
||||
ranges: SmallVec<[FileRange; 2]>,
|
||||
) -> impl Stream<Item = Result<RecordBatch>> {
|
||||
try_stream! {
|
||||
let reader_metrics = &mut ReaderMetrics::default();
|
||||
let fetch_metrics = if part_metrics.explain_verbose() {
|
||||
Some(Arc::new(ParquetFetchMetrics::default()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let reader_metrics = &mut ReaderMetrics {
|
||||
fetch_metrics: fetch_metrics.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
for range in ranges {
|
||||
let build_reader_start = Instant::now();
|
||||
let mut reader = range.flat_reader().await?;
|
||||
let mut reader = range.flat_reader(fetch_metrics.as_deref()).await?;
|
||||
let build_cost = build_reader_start.elapsed();
|
||||
part_metrics.inc_build_reader_cost(build_cost);
|
||||
|
||||
|
||||
@@ -17,11 +17,14 @@ mod builder;
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use common_telemetry::warn;
|
||||
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
|
||||
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
|
||||
use index::bloom_filter::reader::{
|
||||
BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
|
||||
};
|
||||
use index::target::IndexTarget;
|
||||
use object_store::ObjectStore;
|
||||
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
|
||||
@@ -47,6 +50,56 @@ use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
|
||||
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
|
||||
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
|
||||
|
||||
/// Metrics for tracking bloom filter index apply operations.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct BloomFilterIndexApplyMetrics {
|
||||
/// Total time spent applying the index.
|
||||
pub apply_elapsed: std::time::Duration,
|
||||
/// Number of blob cache misses.
|
||||
pub blob_cache_miss: usize,
|
||||
/// Total size of blobs read (in bytes).
|
||||
pub blob_read_bytes: u64,
|
||||
/// Metrics for bloom filter read operations.
|
||||
pub read_metrics: BloomFilterReadMetrics,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{{")?;
|
||||
let mut first = true;
|
||||
|
||||
if !self.apply_elapsed.is_zero() {
|
||||
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
|
||||
first = false;
|
||||
}
|
||||
if self.blob_cache_miss > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
|
||||
first = false;
|
||||
}
|
||||
if self.blob_read_bytes > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?;
|
||||
}
|
||||
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
impl BloomFilterIndexApplyMetrics {
|
||||
/// Merges another metrics into this one.
|
||||
pub fn merge_from(&mut self, other: &Self) {
|
||||
self.apply_elapsed += other.apply_elapsed;
|
||||
self.blob_cache_miss += other.blob_cache_miss;
|
||||
self.blob_read_bytes += other.blob_read_bytes;
|
||||
self.read_metrics.merge_from(&other.read_metrics);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
|
||||
|
||||
/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
|
||||
@@ -133,15 +186,20 @@ impl BloomFilterIndexApplier {
|
||||
///
|
||||
/// Row group id existing in the returned result means that the row group is searched.
|
||||
/// Empty ranges means that the row group is searched but no rows are found.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `file_id` - The region file ID to apply predicates to
|
||||
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
|
||||
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
|
||||
/// * `metrics` - Optional mutable reference to collect metrics on demand
|
||||
pub async fn apply(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_size_hint: Option<u64>,
|
||||
row_groups: impl Iterator<Item = (usize, bool)>,
|
||||
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
|
||||
) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
|
||||
let _timer = INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
|
||||
.start_timer();
|
||||
let apply_start = Instant::now();
|
||||
|
||||
// Calculates row groups' ranges based on start of the file.
|
||||
let mut input = Vec::with_capacity(row_groups.size_hint().0);
|
||||
@@ -163,7 +221,7 @@ impl BloomFilterIndexApplier {
|
||||
|
||||
for (column_id, predicates) in self.predicates.iter() {
|
||||
let blob = match self
|
||||
.blob_reader(file_id, *column_id, file_size_hint)
|
||||
.blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
|
||||
.await?
|
||||
{
|
||||
Some(blob) => blob,
|
||||
@@ -173,6 +231,9 @@ impl BloomFilterIndexApplier {
|
||||
// Create appropriate reader based on whether we have caching enabled
|
||||
if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
|
||||
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
|
||||
if let Some(m) = &mut metrics {
|
||||
m.blob_read_bytes += blob_size;
|
||||
}
|
||||
let reader = CachedBloomFilterIndexBlobReader::new(
|
||||
file_id.file_id(),
|
||||
*column_id,
|
||||
@@ -181,12 +242,12 @@ impl BloomFilterIndexApplier {
|
||||
BloomFilterReaderImpl::new(blob),
|
||||
bloom_filter_cache.clone(),
|
||||
);
|
||||
self.apply_predicates(reader, predicates, &mut output)
|
||||
self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
|
||||
.await
|
||||
.context(ApplyBloomFilterIndexSnafu)?;
|
||||
} else {
|
||||
let reader = BloomFilterReaderImpl::new(blob);
|
||||
self.apply_predicates(reader, predicates, &mut output)
|
||||
self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
|
||||
.await
|
||||
.context(ApplyBloomFilterIndexSnafu)?;
|
||||
}
|
||||
@@ -201,6 +262,16 @@ impl BloomFilterIndexApplier {
|
||||
}
|
||||
}
|
||||
|
||||
// Record elapsed time to histogram and collect metrics if requested
|
||||
let elapsed = apply_start.elapsed();
|
||||
INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.apply_elapsed += elapsed;
|
||||
}
|
||||
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
@@ -212,6 +283,7 @@ impl BloomFilterIndexApplier {
|
||||
file_id: RegionFileId,
|
||||
column_id: ColumnId,
|
||||
file_size_hint: Option<u64>,
|
||||
metrics: Option<&mut BloomFilterIndexApplyMetrics>,
|
||||
) -> Result<Option<BlobReader>> {
|
||||
let reader = match self
|
||||
.cached_blob_reader(file_id, column_id, file_size_hint)
|
||||
@@ -219,6 +291,9 @@ impl BloomFilterIndexApplier {
|
||||
{
|
||||
Ok(Some(puffin_reader)) => puffin_reader,
|
||||
other => {
|
||||
if let Some(m) = metrics {
|
||||
m.blob_cache_miss += 1;
|
||||
}
|
||||
if let Err(err) = other {
|
||||
// Blob not found means no index for this column
|
||||
if is_blob_not_found(&err) {
|
||||
@@ -320,6 +395,7 @@ impl BloomFilterIndexApplier {
|
||||
reader: R,
|
||||
predicates: &[InListPredicate],
|
||||
output: &mut [(usize, Vec<Range<usize>>)],
|
||||
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
|
||||
) -> std::result::Result<(), index::bloom_filter::error::Error> {
|
||||
let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
|
||||
|
||||
@@ -329,7 +405,10 @@ impl BloomFilterIndexApplier {
|
||||
continue;
|
||||
}
|
||||
|
||||
*row_group_output = applier.search(predicates, row_group_output).await?;
|
||||
let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
|
||||
*row_group_output = applier
|
||||
.search(predicates, row_group_output, read_metrics)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -393,7 +472,7 @@ mod tests {
|
||||
|
||||
let applier = builder.build(&exprs).unwrap().unwrap();
|
||||
applier
|
||||
.apply(file_id, None, row_groups.into_iter())
|
||||
.apply(file_id, None, row_groups.into_iter(), None)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
|
||||
@@ -637,17 +637,17 @@ pub(crate) mod tests {
|
||||
.unwrap();
|
||||
let reader = blob_guard.reader().await.unwrap();
|
||||
let bloom_filter = BloomFilterReaderImpl::new(reader);
|
||||
let metadata = bloom_filter.metadata().await.unwrap();
|
||||
let metadata = bloom_filter.metadata(None).await.unwrap();
|
||||
|
||||
assert_eq!(metadata.segment_count, 10);
|
||||
for i in 0..5 {
|
||||
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
|
||||
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
|
||||
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
|
||||
assert!(bf.contains(b"tag1"));
|
||||
}
|
||||
for i in 5..10 {
|
||||
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
|
||||
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
|
||||
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
|
||||
assert!(bf.contains(b"tag2"));
|
||||
}
|
||||
}
|
||||
@@ -662,13 +662,13 @@ pub(crate) mod tests {
|
||||
.unwrap();
|
||||
let reader = blob_guard.reader().await.unwrap();
|
||||
let bloom_filter = BloomFilterReaderImpl::new(reader);
|
||||
let metadata = bloom_filter.metadata().await.unwrap();
|
||||
let metadata = bloom_filter.metadata(None).await.unwrap();
|
||||
|
||||
assert_eq!(metadata.segment_count, 5);
|
||||
for i in 0u64..20 {
|
||||
let idx = i as usize / 4;
|
||||
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
|
||||
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
|
||||
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
|
||||
let mut buf = vec![];
|
||||
IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
|
||||
.unwrap();
|
||||
|
||||
@@ -16,11 +16,12 @@ use std::collections::{BTreeMap, BTreeSet, HashSet};
|
||||
use std::iter;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use common_telemetry::warn;
|
||||
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
|
||||
use index::bloom_filter::reader::BloomFilterReaderImpl;
|
||||
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
|
||||
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
|
||||
use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
|
||||
use index::fulltext_index::{Analyzer, Config};
|
||||
@@ -53,6 +54,91 @@ use crate::sst::index::puffin_manager::{
|
||||
|
||||
pub mod builder;
|
||||
|
||||
/// Metrics for tracking fulltext index apply operations.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct FulltextIndexApplyMetrics {
|
||||
/// Total time spent applying the index.
|
||||
pub apply_elapsed: std::time::Duration,
|
||||
/// Number of blob cache misses.
|
||||
pub blob_cache_miss: usize,
|
||||
/// Number of directory cache hits.
|
||||
pub dir_cache_hit: usize,
|
||||
/// Number of directory cache misses.
|
||||
pub dir_cache_miss: usize,
|
||||
/// Elapsed time to initialize directory data.
|
||||
pub dir_init_elapsed: std::time::Duration,
|
||||
/// Metrics for bloom filter reads.
|
||||
pub bloom_filter_read_metrics: BloomFilterReadMetrics,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for FulltextIndexApplyMetrics {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{{")?;
|
||||
let mut first = true;
|
||||
|
||||
if !self.apply_elapsed.is_zero() {
|
||||
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
|
||||
first = false;
|
||||
}
|
||||
if self.blob_cache_miss > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
|
||||
first = false;
|
||||
}
|
||||
if self.dir_cache_hit > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"dir_cache_hit\":{}", self.dir_cache_hit)?;
|
||||
first = false;
|
||||
}
|
||||
if self.dir_cache_miss > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"dir_cache_miss\":{}", self.dir_cache_miss)?;
|
||||
first = false;
|
||||
}
|
||||
if !self.dir_init_elapsed.is_zero() {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"dir_init_elapsed\":\"{:?}\"", self.dir_init_elapsed)?;
|
||||
}
|
||||
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
impl FulltextIndexApplyMetrics {
|
||||
/// Collects metrics from a directory read operation.
|
||||
pub fn collect_dir_metrics(
|
||||
&mut self,
|
||||
elapsed: std::time::Duration,
|
||||
dir_metrics: puffin::puffin_manager::DirMetrics,
|
||||
) {
|
||||
self.dir_init_elapsed += elapsed;
|
||||
if dir_metrics.cache_hit {
|
||||
self.dir_cache_hit += 1;
|
||||
} else {
|
||||
self.dir_cache_miss += 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// Merges another metrics into this one.
|
||||
pub fn merge_from(&mut self, other: &Self) {
|
||||
self.apply_elapsed += other.apply_elapsed;
|
||||
self.blob_cache_miss += other.blob_cache_miss;
|
||||
self.dir_cache_hit += other.dir_cache_hit;
|
||||
self.dir_cache_miss += other.dir_cache_miss;
|
||||
self.dir_init_elapsed += other.dir_init_elapsed;
|
||||
self.bloom_filter_read_metrics
|
||||
.merge_from(&other.bloom_filter_read_metrics);
|
||||
}
|
||||
}
|
||||
|
||||
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
|
||||
pub struct FulltextIndexApplier {
|
||||
/// Requests to be applied.
|
||||
@@ -124,14 +210,18 @@ impl FulltextIndexApplier {
|
||||
impl FulltextIndexApplier {
|
||||
/// Applies fine-grained fulltext index to the specified SST file.
|
||||
/// Returns the row ids that match the queries.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `file_id` - The region file ID to apply predicates to
|
||||
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
|
||||
/// * `metrics` - Optional mutable reference to collect metrics on demand
|
||||
pub async fn apply_fine(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_size_hint: Option<u64>,
|
||||
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
) -> Result<Option<BTreeSet<RowId>>> {
|
||||
let timer = INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_FULLTEXT_INDEX])
|
||||
.start_timer();
|
||||
let apply_start = Instant::now();
|
||||
|
||||
let mut row_ids: Option<BTreeSet<RowId>> = None;
|
||||
for (column_id, request) in self.requests.iter() {
|
||||
@@ -140,7 +230,13 @@ impl FulltextIndexApplier {
|
||||
}
|
||||
|
||||
let Some(result) = self
|
||||
.apply_fine_one_column(file_size_hint, file_id, *column_id, request)
|
||||
.apply_fine_one_column(
|
||||
file_size_hint,
|
||||
file_id,
|
||||
*column_id,
|
||||
request,
|
||||
metrics.as_deref_mut(),
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
continue;
|
||||
@@ -159,9 +255,16 @@ impl FulltextIndexApplier {
|
||||
}
|
||||
}
|
||||
|
||||
if row_ids.is_none() {
|
||||
timer.stop_and_discard();
|
||||
// Record elapsed time to histogram and collect metrics if requested
|
||||
let elapsed = apply_start.elapsed();
|
||||
INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_FULLTEXT_INDEX])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.apply_elapsed += elapsed;
|
||||
}
|
||||
|
||||
Ok(row_ids)
|
||||
}
|
||||
|
||||
@@ -171,6 +274,7 @@ impl FulltextIndexApplier {
|
||||
file_id: RegionFileId,
|
||||
column_id: ColumnId,
|
||||
request: &FulltextRequest,
|
||||
metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
) -> Result<Option<BTreeSet<RowId>>> {
|
||||
let blob_key = format!(
|
||||
"{INDEX_BLOB_TYPE_TANTIVY}-{}",
|
||||
@@ -178,7 +282,7 @@ impl FulltextIndexApplier {
|
||||
);
|
||||
let dir = self
|
||||
.index_source
|
||||
.dir(file_id, &blob_key, file_size_hint)
|
||||
.dir(file_id, &blob_key, file_size_hint, metrics)
|
||||
.await?;
|
||||
|
||||
let dir = match &dir {
|
||||
@@ -240,15 +344,20 @@ impl FulltextIndexApplier {
|
||||
///
|
||||
/// Row group id existing in the returned result means that the row group is searched.
|
||||
/// Empty ranges means that the row group is searched but no rows are found.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `file_id` - The region file ID to apply predicates to
|
||||
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
|
||||
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
|
||||
/// * `metrics` - Optional mutable reference to collect metrics on demand
|
||||
pub async fn apply_coarse(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_size_hint: Option<u64>,
|
||||
row_groups: impl Iterator<Item = (usize, bool)>,
|
||||
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
|
||||
let timer = INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_FULLTEXT_INDEX])
|
||||
.start_timer();
|
||||
let apply_start = Instant::now();
|
||||
|
||||
let (input, mut output) = Self::init_coarse_output(row_groups);
|
||||
let mut applied = false;
|
||||
@@ -266,16 +375,27 @@ impl FulltextIndexApplier {
|
||||
*column_id,
|
||||
&request.terms,
|
||||
&mut output,
|
||||
metrics.as_deref_mut(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !applied {
|
||||
timer.stop_and_discard();
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Self::adjust_coarse_output(input, &mut output);
|
||||
|
||||
// Record elapsed time to histogram and collect metrics if requested
|
||||
let elapsed = apply_start.elapsed();
|
||||
INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_FULLTEXT_INDEX])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.apply_elapsed += elapsed;
|
||||
}
|
||||
|
||||
Ok(Some(output))
|
||||
}
|
||||
|
||||
@@ -286,6 +406,7 @@ impl FulltextIndexApplier {
|
||||
column_id: ColumnId,
|
||||
terms: &[FulltextTerm],
|
||||
output: &mut [(usize, Vec<Range<usize>>)],
|
||||
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
) -> Result<bool> {
|
||||
let blob_key = format!(
|
||||
"{INDEX_BLOB_TYPE_BLOOM}-{}",
|
||||
@@ -293,7 +414,7 @@ impl FulltextIndexApplier {
|
||||
);
|
||||
let Some(reader) = self
|
||||
.index_source
|
||||
.blob(file_id, &blob_key, file_size_hint)
|
||||
.blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut())
|
||||
.await?
|
||||
else {
|
||||
return Ok(false);
|
||||
@@ -336,7 +457,13 @@ impl FulltextIndexApplier {
|
||||
}
|
||||
|
||||
*row_group_output = applier
|
||||
.search(&predicates, row_group_output)
|
||||
.search(
|
||||
&predicates,
|
||||
row_group_output,
|
||||
metrics
|
||||
.as_deref_mut()
|
||||
.map(|m| &mut m.bloom_filter_read_metrics),
|
||||
)
|
||||
.await
|
||||
.context(ApplyBloomFilterIndexSnafu)?;
|
||||
}
|
||||
@@ -483,8 +610,15 @@ impl IndexSource {
|
||||
file_id: RegionFileId,
|
||||
key: &str,
|
||||
file_size_hint: Option<u64>,
|
||||
metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
|
||||
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
|
||||
|
||||
// Track cache miss if fallbacked to remote
|
||||
if fallbacked && let Some(m) = metrics {
|
||||
m.blob_cache_miss += 1;
|
||||
}
|
||||
|
||||
let res = reader.blob(key).await;
|
||||
match res {
|
||||
Ok(blob) => Ok(Some(blob)),
|
||||
@@ -514,11 +648,25 @@ impl IndexSource {
|
||||
file_id: RegionFileId,
|
||||
key: &str,
|
||||
file_size_hint: Option<u64>,
|
||||
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
|
||||
) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
|
||||
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
|
||||
|
||||
// Track cache miss if fallbacked to remote
|
||||
if fallbacked && let Some(m) = &mut metrics {
|
||||
m.blob_cache_miss += 1;
|
||||
}
|
||||
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
let res = reader.dir(key).await;
|
||||
match res {
|
||||
Ok(dir) => Ok(Some(dir)),
|
||||
Ok((dir, dir_metrics)) => {
|
||||
if let Some(m) = metrics {
|
||||
// Safety: start is Some when metrics is Some
|
||||
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
|
||||
}
|
||||
Ok(Some(dir))
|
||||
}
|
||||
Err(err) if err.is_blob_not_found() => Ok(None),
|
||||
Err(err) => {
|
||||
if fallbacked {
|
||||
@@ -526,9 +674,16 @@ impl IndexSource {
|
||||
} else {
|
||||
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
|
||||
let reader = self.build_remote(file_id, file_size_hint).await?;
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
let res = reader.dir(key).await;
|
||||
match res {
|
||||
Ok(dir) => Ok(Some(dir)),
|
||||
Ok((dir, dir_metrics)) => {
|
||||
if let Some(m) = metrics {
|
||||
// Safety: start is Some when metrics is Some
|
||||
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
|
||||
}
|
||||
Ok(Some(dir))
|
||||
}
|
||||
Err(err) if err.is_blob_not_found() => Ok(None),
|
||||
Err(err) => Err(err).context(PuffinReadBlobSnafu),
|
||||
}
|
||||
|
||||
@@ -723,15 +723,16 @@ mod tests {
|
||||
let backend = backend.clone();
|
||||
async move {
|
||||
match backend {
|
||||
FulltextBackend::Tantivy => {
|
||||
applier.apply_fine(region_file_id, None).await.unwrap()
|
||||
}
|
||||
FulltextBackend::Tantivy => applier
|
||||
.apply_fine(region_file_id, None, None)
|
||||
.await
|
||||
.unwrap(),
|
||||
FulltextBackend::Bloom => {
|
||||
let coarse_mask = coarse_mask.unwrap_or_default();
|
||||
let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
|
||||
// row group id == row id
|
||||
let resp = applier
|
||||
.apply_coarse(region_file_id, None, row_groups)
|
||||
.apply_coarse(region_file_id, None, row_groups, None)
|
||||
.await
|
||||
.unwrap();
|
||||
resp.map(|r| {
|
||||
|
||||
@@ -16,10 +16,11 @@ pub mod builder;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use common_telemetry::warn;
|
||||
use index::inverted_index::format::reader::InvertedIndexBlobReader;
|
||||
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
|
||||
use index::inverted_index::search::index_apply::{
|
||||
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
|
||||
};
|
||||
@@ -44,6 +45,57 @@ use crate::sst::index::TYPE_INVERTED_INDEX;
|
||||
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
|
||||
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
|
||||
|
||||
/// Metrics for tracking inverted index apply operations.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct InvertedIndexApplyMetrics {
|
||||
/// Total time spent applying the index.
|
||||
pub apply_elapsed: std::time::Duration,
|
||||
/// Number of blob cache misses (0 or 1).
|
||||
pub blob_cache_miss: usize,
|
||||
/// Total size of blobs read (in bytes).
|
||||
pub blob_read_bytes: u64,
|
||||
/// Metrics for inverted index reads.
|
||||
pub inverted_index_read_metrics: InvertedIndexReadMetrics,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InvertedIndexApplyMetrics {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{{")?;
|
||||
let mut first = true;
|
||||
|
||||
if !self.apply_elapsed.is_zero() {
|
||||
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
|
||||
first = false;
|
||||
}
|
||||
if self.blob_cache_miss > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
|
||||
first = false;
|
||||
}
|
||||
if self.blob_read_bytes > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?;
|
||||
}
|
||||
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
impl InvertedIndexApplyMetrics {
|
||||
/// Merges another metrics into this one.
|
||||
pub fn merge_from(&mut self, other: &Self) {
|
||||
self.apply_elapsed += other.apply_elapsed;
|
||||
self.blob_cache_miss += other.blob_cache_miss;
|
||||
self.blob_read_bytes += other.blob_read_bytes;
|
||||
self.inverted_index_read_metrics
|
||||
.merge_from(&other.inverted_index_read_metrics);
|
||||
}
|
||||
}
|
||||
|
||||
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
|
||||
/// and returning the relevant row group ids for further scan.
|
||||
pub(crate) struct InvertedIndexApplier {
|
||||
@@ -124,24 +176,30 @@ impl InvertedIndexApplier {
|
||||
self
|
||||
}
|
||||
|
||||
/// Applies predicates to the provided SST file id and returns the relevant row group ids
|
||||
/// Applies predicates to the provided SST file id and returns the relevant row group ids.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `file_id` - The region file ID to apply predicates to
|
||||
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
|
||||
/// * `metrics` - Optional mutable reference to collect metrics on demand
|
||||
pub async fn apply(
|
||||
&self,
|
||||
file_id: RegionFileId,
|
||||
file_size_hint: Option<u64>,
|
||||
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
|
||||
) -> Result<ApplyOutput> {
|
||||
let _timer = INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_INVERTED_INDEX])
|
||||
.start_timer();
|
||||
let start = Instant::now();
|
||||
|
||||
let context = SearchContext {
|
||||
// Encountering a non-existing column indicates that it doesn't match predicates.
|
||||
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
|
||||
};
|
||||
|
||||
let mut cache_miss = 0;
|
||||
let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
|
||||
Ok(Some(puffin_reader)) => puffin_reader,
|
||||
other => {
|
||||
cache_miss += 1;
|
||||
if let Err(err) = other {
|
||||
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
|
||||
}
|
||||
@@ -149,8 +207,9 @@ impl InvertedIndexApplier {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(index_cache) = &self.inverted_index_cache {
|
||||
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
|
||||
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
|
||||
|
||||
let result = if let Some(index_cache) = &self.inverted_index_cache {
|
||||
let mut index_reader = CachedInvertedIndexBlobReader::new(
|
||||
file_id.file_id(),
|
||||
blob_size,
|
||||
@@ -158,16 +217,42 @@ impl InvertedIndexApplier {
|
||||
index_cache.clone(),
|
||||
);
|
||||
self.index_applier
|
||||
.apply(context, &mut index_reader)
|
||||
.apply(
|
||||
context,
|
||||
&mut index_reader,
|
||||
metrics
|
||||
.as_deref_mut()
|
||||
.map(|m| &mut m.inverted_index_read_metrics),
|
||||
)
|
||||
.await
|
||||
.context(ApplyInvertedIndexSnafu)
|
||||
} else {
|
||||
let mut index_reader = InvertedIndexBlobReader::new(blob);
|
||||
self.index_applier
|
||||
.apply(context, &mut index_reader)
|
||||
.apply(
|
||||
context,
|
||||
&mut index_reader,
|
||||
metrics
|
||||
.as_deref_mut()
|
||||
.map(|m| &mut m.inverted_index_read_metrics),
|
||||
)
|
||||
.await
|
||||
.context(ApplyInvertedIndexSnafu)
|
||||
};
|
||||
|
||||
// Record elapsed time to histogram and collect metrics if requested
|
||||
let elapsed = start.elapsed();
|
||||
INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_INVERTED_INDEX])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if let Some(metrics) = metrics {
|
||||
metrics.apply_elapsed = elapsed;
|
||||
metrics.blob_cache_miss = cache_miss;
|
||||
metrics.blob_read_bytes = blob_size;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Creates a blob reader from the cached index file.
|
||||
@@ -281,7 +366,7 @@ mod tests {
|
||||
|
||||
let mut mock_index_applier = MockIndexApplier::new();
|
||||
mock_index_applier.expect_memory_usage().returning(|| 100);
|
||||
mock_index_applier.expect_apply().returning(|_, _| {
|
||||
mock_index_applier.expect_apply().returning(|_, _, _| {
|
||||
Ok(ApplyOutput {
|
||||
matched_segment_ids: Bitmap::new_bitvec(),
|
||||
total_row_count: 100,
|
||||
@@ -297,7 +382,7 @@ mod tests {
|
||||
puffin_manager_factory,
|
||||
Default::default(),
|
||||
);
|
||||
let output = sst_index_applier.apply(file_id, None).await.unwrap();
|
||||
let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
|
||||
assert_eq!(
|
||||
output,
|
||||
ApplyOutput {
|
||||
@@ -345,7 +430,7 @@ mod tests {
|
||||
puffin_manager_factory,
|
||||
Default::default(),
|
||||
);
|
||||
let res = sst_index_applier.apply(file_id, None).await;
|
||||
let res = sst_index_applier.apply(file_id, None, None).await;
|
||||
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -615,7 +615,7 @@ mod tests {
|
||||
.unwrap();
|
||||
Box::pin(async move {
|
||||
applier
|
||||
.apply(sst_file_id, None)
|
||||
.apply(sst_file_id, None, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.matched_segment_ids
|
||||
|
||||
@@ -245,7 +245,7 @@ mod tests {
|
||||
let bs = blob_reader.read(0..meta.content_length).await.unwrap();
|
||||
assert_eq!(&*bs, raw_data);
|
||||
|
||||
let dir_guard = reader.dir(dir_key).await.unwrap();
|
||||
let (dir_guard, _metrics) = reader.dir(dir_key).await.unwrap();
|
||||
let file = dir_guard.path().join("hello");
|
||||
let data = tokio::fs::read(file).await.unwrap();
|
||||
assert_eq!(data, raw_data);
|
||||
|
||||
@@ -45,6 +45,7 @@ use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::reader::{
|
||||
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
|
||||
};
|
||||
use crate::sst::parquet::row_group::ParquetFetchMetrics;
|
||||
|
||||
/// Checks if a row group contains delete operations by examining the min value of op_type column.
|
||||
///
|
||||
@@ -117,11 +118,16 @@ impl FileRange {
|
||||
pub(crate) async fn reader(
|
||||
&self,
|
||||
selector: Option<TimeSeriesRowSelector>,
|
||||
fetch_metrics: Option<&ParquetFetchMetrics>,
|
||||
) -> Result<PruneReader> {
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder
|
||||
.build(self.row_group_idx, self.row_selection.clone())
|
||||
.build(
|
||||
self.row_group_idx,
|
||||
self.row_selection.clone(),
|
||||
fetch_metrics,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let use_last_row_reader = if selector
|
||||
@@ -168,11 +174,18 @@ impl FileRange {
|
||||
}
|
||||
|
||||
/// Creates a flat reader that returns RecordBatch.
|
||||
pub(crate) async fn flat_reader(&self) -> Result<FlatPruneReader> {
|
||||
pub(crate) async fn flat_reader(
|
||||
&self,
|
||||
fetch_metrics: Option<&ParquetFetchMetrics>,
|
||||
) -> Result<FlatPruneReader> {
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder
|
||||
.build(self.row_group_idx, self.row_selection.clone())
|
||||
.build(
|
||||
self.row_group_idx,
|
||||
self.row_selection.clone(),
|
||||
fetch_metrics,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Compute skip_fields once for this row group
|
||||
|
||||
@@ -52,15 +52,21 @@ use crate::metrics::{
|
||||
use crate::read::prune::{PruneReader, Source};
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
|
||||
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
|
||||
use crate::sst::index::bloom_filter::applier::{
|
||||
BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
|
||||
};
|
||||
use crate::sst::index::fulltext_index::applier::{
|
||||
FulltextIndexApplierRef, FulltextIndexApplyMetrics,
|
||||
};
|
||||
use crate::sst::index::inverted_index::applier::{
|
||||
InvertedIndexApplierRef, InvertedIndexApplyMetrics,
|
||||
};
|
||||
use crate::sst::parquet::file_range::{
|
||||
FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
|
||||
};
|
||||
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
|
||||
use crate::sst::parquet::metadata::MetadataLoader;
|
||||
use crate::sst::parquet::row_group::InMemoryRowGroup;
|
||||
use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
|
||||
use crate::sst::parquet::row_selection::RowGroupSelection;
|
||||
use crate::sst::parquet::stats::RowGroupPruningStats;
|
||||
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
|
||||
@@ -253,7 +259,9 @@ impl ParquetReaderBuilder {
|
||||
let file_size = self.file_handle.meta_ref().file_size;
|
||||
|
||||
// Loads parquet metadata of the file.
|
||||
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
|
||||
let parquet_meta = self
|
||||
.read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
|
||||
.await?;
|
||||
// Decodes region metadata.
|
||||
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
|
||||
// Gets the metadata stored in the SST.
|
||||
@@ -378,25 +386,34 @@ impl ParquetReaderBuilder {
|
||||
&self,
|
||||
file_path: &str,
|
||||
file_size: u64,
|
||||
cache_metrics: &mut MetadataCacheMetrics,
|
||||
) -> Result<Arc<ParquetMetaData>> {
|
||||
let start = Instant::now();
|
||||
let _t = READ_STAGE_ELAPSED
|
||||
.with_label_values(&["read_parquet_metadata"])
|
||||
.start_timer();
|
||||
|
||||
let file_id = self.file_handle.file_id();
|
||||
// Tries to get from global cache.
|
||||
if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
|
||||
// Tries to get from cache with metrics tracking.
|
||||
if let Some(metadata) = self
|
||||
.cache_strategy
|
||||
.get_parquet_meta_data_with_metrics(file_id, cache_metrics)
|
||||
.await
|
||||
{
|
||||
cache_metrics.metadata_load_cost += start.elapsed();
|
||||
return Ok(metadata);
|
||||
}
|
||||
|
||||
// Cache miss, load metadata directly.
|
||||
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
|
||||
let metadata = metadata_loader.load().await?;
|
||||
|
||||
let metadata = Arc::new(metadata);
|
||||
// Cache the metadata.
|
||||
self.cache_strategy
|
||||
.put_parquet_meta_data(file_id, metadata.clone());
|
||||
|
||||
cache_metrics.metadata_load_cost += start.elapsed();
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
@@ -527,7 +544,11 @@ impl ParquetReaderBuilder {
|
||||
// Slow path: apply the index from the file.
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_res = index_applier
|
||||
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
|
||||
.apply_fine(
|
||||
self.file_handle.file_id(),
|
||||
Some(file_size_hint),
|
||||
metrics.fulltext_index_apply_metrics.as_mut(),
|
||||
)
|
||||
.await;
|
||||
let selection = match apply_res {
|
||||
Ok(Some(res)) => {
|
||||
@@ -595,13 +616,17 @@ impl ParquetReaderBuilder {
|
||||
// Slow path: apply the index from the file.
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_res = index_applier
|
||||
.apply(self.file_handle.file_id(), Some(file_size_hint))
|
||||
.apply(
|
||||
self.file_handle.file_id(),
|
||||
Some(file_size_hint),
|
||||
metrics.inverted_index_apply_metrics.as_mut(),
|
||||
)
|
||||
.await;
|
||||
let selection = match apply_res {
|
||||
Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
|
||||
Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
|
||||
row_group_size,
|
||||
num_row_groups,
|
||||
output,
|
||||
apply_output,
|
||||
),
|
||||
Err(err) => {
|
||||
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
|
||||
@@ -670,7 +695,12 @@ impl ParquetReaderBuilder {
|
||||
)
|
||||
});
|
||||
let apply_res = index_applier
|
||||
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
|
||||
.apply(
|
||||
self.file_handle.file_id(),
|
||||
Some(file_size_hint),
|
||||
rgs,
|
||||
metrics.bloom_filter_apply_metrics.as_mut(),
|
||||
)
|
||||
.await;
|
||||
let mut selection = match apply_res {
|
||||
Ok(apply_output) => {
|
||||
@@ -748,7 +778,12 @@ impl ParquetReaderBuilder {
|
||||
)
|
||||
});
|
||||
let apply_res = index_applier
|
||||
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
|
||||
.apply_coarse(
|
||||
self.file_handle.file_id(),
|
||||
Some(file_size_hint),
|
||||
rgs,
|
||||
metrics.fulltext_index_apply_metrics.as_mut(),
|
||||
)
|
||||
.await;
|
||||
let mut selection = match apply_res {
|
||||
Ok(Some(apply_output)) => {
|
||||
@@ -892,7 +927,7 @@ fn all_required_row_groups_searched(
|
||||
}
|
||||
|
||||
/// Metrics of filtering rows groups and rows.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub(crate) struct ReaderFilterMetrics {
|
||||
/// Number of row groups before filtering.
|
||||
pub(crate) rg_total: usize,
|
||||
@@ -915,6 +950,13 @@ pub(crate) struct ReaderFilterMetrics {
|
||||
pub(crate) rows_bloom_filtered: usize,
|
||||
/// Number of rows filtered by precise filter.
|
||||
pub(crate) rows_precise_filtered: usize,
|
||||
|
||||
/// Optional metrics for inverted index applier.
|
||||
pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
|
||||
/// Optional metrics for bloom filter index applier.
|
||||
pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
|
||||
/// Optional metrics for fulltext index applier.
|
||||
pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
|
||||
}
|
||||
|
||||
impl ReaderFilterMetrics {
|
||||
@@ -931,6 +973,23 @@ impl ReaderFilterMetrics {
|
||||
self.rows_inverted_filtered += other.rows_inverted_filtered;
|
||||
self.rows_bloom_filtered += other.rows_bloom_filtered;
|
||||
self.rows_precise_filtered += other.rows_precise_filtered;
|
||||
|
||||
// Merge optional applier metrics
|
||||
if let Some(other_metrics) = &other.inverted_index_apply_metrics {
|
||||
self.inverted_index_apply_metrics
|
||||
.get_or_insert_with(Default::default)
|
||||
.merge_from(other_metrics);
|
||||
}
|
||||
if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
|
||||
self.bloom_filter_apply_metrics
|
||||
.get_or_insert_with(Default::default)
|
||||
.merge_from(other_metrics);
|
||||
}
|
||||
if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
|
||||
self.fulltext_index_apply_metrics
|
||||
.get_or_insert_with(Default::default)
|
||||
.merge_from(other_metrics);
|
||||
}
|
||||
}
|
||||
|
||||
/// Reports metrics.
|
||||
@@ -987,6 +1046,77 @@ impl ReaderFilterMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for parquet metadata cache operations.
|
||||
#[derive(Default, Clone, Copy)]
|
||||
pub(crate) struct MetadataCacheMetrics {
|
||||
/// Number of memory cache hits for parquet metadata.
|
||||
pub(crate) mem_cache_hit: usize,
|
||||
/// Number of memory cache misses for parquet metadata.
|
||||
pub(crate) mem_cache_miss: usize,
|
||||
/// Number of file cache hits for parquet metadata.
|
||||
pub(crate) file_cache_hit: usize,
|
||||
/// Number of file cache misses for parquet metadata.
|
||||
pub(crate) file_cache_miss: usize,
|
||||
/// Duration to load parquet metadata.
|
||||
pub(crate) metadata_load_cost: Duration,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for MetadataCacheMetrics {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{{")?;
|
||||
let mut first = true;
|
||||
|
||||
if self.mem_cache_hit > 0 {
|
||||
write!(f, "\"mem_cache_hit\":{}", self.mem_cache_hit)?;
|
||||
first = false;
|
||||
}
|
||||
if self.mem_cache_miss > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"mem_cache_miss\":{}", self.mem_cache_miss)?;
|
||||
first = false;
|
||||
}
|
||||
if self.file_cache_hit > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"file_cache_hit\":{}", self.file_cache_hit)?;
|
||||
first = false;
|
||||
}
|
||||
if self.file_cache_miss > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"file_cache_miss\":{}", self.file_cache_miss)?;
|
||||
first = false;
|
||||
}
|
||||
if !self.metadata_load_cost.is_zero() {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(
|
||||
f,
|
||||
"\"metadata_load_cost\":\"{:?}\"",
|
||||
self.metadata_load_cost
|
||||
)?;
|
||||
}
|
||||
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
impl MetadataCacheMetrics {
|
||||
/// Adds `other` metrics to this metrics.
|
||||
pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
|
||||
self.mem_cache_hit += other.mem_cache_hit;
|
||||
self.mem_cache_miss += other.mem_cache_miss;
|
||||
self.file_cache_hit += other.file_cache_hit;
|
||||
self.file_cache_miss += other.file_cache_miss;
|
||||
self.metadata_load_cost += other.metadata_load_cost;
|
||||
}
|
||||
}
|
||||
|
||||
/// Parquet reader metrics.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ReaderMetrics {
|
||||
@@ -1002,6 +1132,10 @@ pub struct ReaderMetrics {
|
||||
pub(crate) num_batches: usize,
|
||||
/// Number of rows read.
|
||||
pub(crate) num_rows: usize,
|
||||
/// Metrics for parquet metadata cache.
|
||||
pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
|
||||
/// Optional metrics for page/row group fetch operations.
|
||||
pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
|
||||
}
|
||||
|
||||
impl ReaderMetrics {
|
||||
@@ -1013,6 +1147,15 @@ impl ReaderMetrics {
|
||||
self.num_record_batches += other.num_record_batches;
|
||||
self.num_batches += other.num_batches;
|
||||
self.num_rows += other.num_rows;
|
||||
self.metadata_cache_metrics
|
||||
.merge_from(&other.metadata_cache_metrics);
|
||||
if let Some(other_fetch) = &other.fetch_metrics {
|
||||
if let Some(self_fetch) = &self.fetch_metrics {
|
||||
self_fetch.merge_from(other_fetch);
|
||||
} else {
|
||||
self.fetch_metrics = Some(other_fetch.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Reports total rows.
|
||||
@@ -1067,7 +1210,10 @@ impl RowGroupReaderBuilder {
|
||||
&self,
|
||||
row_group_idx: usize,
|
||||
row_selection: Option<RowSelection>,
|
||||
fetch_metrics: Option<&ParquetFetchMetrics>,
|
||||
) -> Result<ParquetRecordBatchReader> {
|
||||
let fetch_start = Instant::now();
|
||||
|
||||
let mut row_group = InMemoryRowGroup::create(
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id().file_id(),
|
||||
@@ -1079,12 +1225,17 @@ impl RowGroupReaderBuilder {
|
||||
);
|
||||
// Fetches data into memory.
|
||||
row_group
|
||||
.fetch(&self.projection, row_selection.as_ref())
|
||||
.fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
|
||||
.await
|
||||
.context(ReadParquetSnafu {
|
||||
path: &self.file_path,
|
||||
})?;
|
||||
|
||||
// Record total fetch elapsed time.
|
||||
if let Some(metrics) = fetch_metrics {
|
||||
metrics.add_total_fetch_elapsed(fetch_start.elapsed().as_micros() as u64);
|
||||
}
|
||||
|
||||
// Builds the parquet reader.
|
||||
// Now the row selection is None.
|
||||
ParquetRecordBatchReader::try_new_with_row_groups(
|
||||
@@ -1228,6 +1379,8 @@ pub struct ParquetReader {
|
||||
selection: RowGroupSelection,
|
||||
/// Reader of current row group.
|
||||
reader_state: ReaderState,
|
||||
/// Metrics for tracking row group fetch operations.
|
||||
fetch_metrics: ParquetFetchMetrics,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -1247,7 +1400,11 @@ impl BatchReader for ParquetReader {
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder()
|
||||
.build(row_group_idx, Some(row_selection))
|
||||
.build(
|
||||
row_group_idx,
|
||||
Some(row_selection),
|
||||
Some(&self.fetch_metrics),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Resets the parquet reader.
|
||||
@@ -1303,11 +1460,12 @@ impl ParquetReader {
|
||||
context: FileRangeContextRef,
|
||||
mut selection: RowGroupSelection,
|
||||
) -> Result<Self> {
|
||||
let fetch_metrics = ParquetFetchMetrics::default();
|
||||
// No more items in current row group, reads next row group.
|
||||
let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
|
||||
let parquet_reader = context
|
||||
.reader_builder()
|
||||
.build(row_group_idx, Some(row_selection))
|
||||
.build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
|
||||
.await?;
|
||||
// Compute skip_fields once for this row group
|
||||
let skip_fields = context.should_skip_fields(row_group_idx);
|
||||
@@ -1324,6 +1482,7 @@ impl ParquetReader {
|
||||
context,
|
||||
selection,
|
||||
reader_state,
|
||||
fetch_metrics,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -35,6 +35,206 @@ use crate::cache::{CacheStrategy, PageKey, PageValue};
|
||||
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
|
||||
use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
|
||||
|
||||
/// Metrics for tracking page/row group fetch operations.
|
||||
/// Uses atomic counters for thread-safe updates.
|
||||
#[derive(Default)]
|
||||
pub struct ParquetFetchMetrics {
|
||||
/// Number of page cache hits.
|
||||
page_cache_hit: std::sync::atomic::AtomicUsize,
|
||||
/// Number of write cache hits.
|
||||
write_cache_hit: std::sync::atomic::AtomicUsize,
|
||||
/// Number of pages to fetch.
|
||||
pages_to_fetch: std::sync::atomic::AtomicUsize,
|
||||
/// Total size in bytes of pages to fetch.
|
||||
page_size_to_fetch: std::sync::atomic::AtomicU64,
|
||||
/// Elapsed time in microseconds fetching from write cache.
|
||||
write_cache_fetch_elapsed: std::sync::atomic::AtomicU64,
|
||||
/// Elapsed time in microseconds fetching from object store.
|
||||
store_fetch_elapsed: std::sync::atomic::AtomicU64,
|
||||
/// Total elapsed time in microseconds for fetching row groups.
|
||||
total_fetch_elapsed: std::sync::atomic::AtomicU64,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ParquetFetchMetrics {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{{")?;
|
||||
let mut first = true;
|
||||
|
||||
let page_cache_hit = self.page_cache_hit();
|
||||
let write_cache_hit = self.write_cache_hit();
|
||||
let pages_to_fetch = self.pages_to_fetch();
|
||||
let page_size_to_fetch = self.page_size_to_fetch();
|
||||
let write_cache_elapsed = self.write_cache_fetch_elapsed();
|
||||
let store_elapsed = self.store_fetch_elapsed();
|
||||
let total_elapsed = self.total_fetch_elapsed();
|
||||
|
||||
if page_cache_hit > 0 {
|
||||
write!(f, "\"page_cache_hit\":{}", page_cache_hit)?;
|
||||
first = false;
|
||||
}
|
||||
if write_cache_hit > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"write_cache_hit\":{}", write_cache_hit)?;
|
||||
first = false;
|
||||
}
|
||||
if pages_to_fetch > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"pages_to_fetch\":{}", pages_to_fetch)?;
|
||||
first = false;
|
||||
}
|
||||
if page_size_to_fetch > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"page_size_to_fetch\":{}", page_size_to_fetch)?;
|
||||
first = false;
|
||||
}
|
||||
if write_cache_elapsed > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
let duration = std::time::Duration::from_micros(write_cache_elapsed);
|
||||
write!(f, "\"write_cache_fetch_elapsed\":\"{:?}\"", duration)?;
|
||||
first = false;
|
||||
}
|
||||
if store_elapsed > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
let duration = std::time::Duration::from_micros(store_elapsed);
|
||||
write!(f, "\"store_fetch_elapsed\":\"{:?}\"", duration)?;
|
||||
first = false;
|
||||
}
|
||||
if total_elapsed > 0 {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
let duration = std::time::Duration::from_micros(total_elapsed);
|
||||
write!(f, "\"total_fetch_elapsed\":\"{:?}\"", duration)?;
|
||||
}
|
||||
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
impl ParquetFetchMetrics {
|
||||
/// Increments page cache hit counter.
|
||||
pub fn inc_page_cache_hit(&self) {
|
||||
self.page_cache_hit
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increments write cache hit counter.
|
||||
pub fn inc_write_cache_hit(&self) {
|
||||
self.write_cache_hit
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Adds to the pages to fetch counter.
|
||||
pub fn add_pages_to_fetch(&self, count: usize) {
|
||||
self.pages_to_fetch
|
||||
.fetch_add(count, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Adds to the page size to fetch counter.
|
||||
pub fn add_page_size_to_fetch(&self, size: u64) {
|
||||
self.page_size_to_fetch
|
||||
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Returns the page cache hit count.
|
||||
pub fn page_cache_hit(&self) -> usize {
|
||||
self.page_cache_hit
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns the write cache hit count.
|
||||
pub fn write_cache_hit(&self) -> usize {
|
||||
self.write_cache_hit
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns the pages to fetch count.
|
||||
pub fn pages_to_fetch(&self) -> usize {
|
||||
self.pages_to_fetch
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns the page size to fetch in bytes.
|
||||
pub fn page_size_to_fetch(&self) -> u64 {
|
||||
self.page_size_to_fetch
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Adds elapsed time in microseconds for write cache fetch.
|
||||
pub fn add_write_cache_fetch_elapsed(&self, elapsed_us: u64) {
|
||||
self.write_cache_fetch_elapsed
|
||||
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Returns the elapsed time in microseconds for write cache fetch.
|
||||
pub fn write_cache_fetch_elapsed(&self) -> u64 {
|
||||
self.write_cache_fetch_elapsed
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Adds elapsed time in microseconds for object store fetch.
|
||||
pub fn add_store_fetch_elapsed(&self, elapsed_us: u64) {
|
||||
self.store_fetch_elapsed
|
||||
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Returns the elapsed time in microseconds for object store fetch.
|
||||
pub fn store_fetch_elapsed(&self) -> u64 {
|
||||
self.store_fetch_elapsed
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Adds elapsed time in microseconds for total fetch operation.
|
||||
pub fn add_total_fetch_elapsed(&self, elapsed_us: u64) {
|
||||
self.total_fetch_elapsed
|
||||
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Returns the total elapsed time in microseconds for fetch operations.
|
||||
pub fn total_fetch_elapsed(&self) -> u64 {
|
||||
self.total_fetch_elapsed
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Merges metrics from another [ParquetFetchMetrics].
|
||||
pub fn merge_from(&self, other: &ParquetFetchMetrics) {
|
||||
self.page_cache_hit
|
||||
.fetch_add(other.page_cache_hit(), std::sync::atomic::Ordering::Relaxed);
|
||||
self.write_cache_hit.fetch_add(
|
||||
other.write_cache_hit(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
self.pages_to_fetch
|
||||
.fetch_add(other.pages_to_fetch(), std::sync::atomic::Ordering::Relaxed);
|
||||
self.page_size_to_fetch.fetch_add(
|
||||
other.page_size_to_fetch(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
self.write_cache_fetch_elapsed.fetch_add(
|
||||
other.write_cache_fetch_elapsed(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
self.store_fetch_elapsed.fetch_add(
|
||||
other.store_fetch_elapsed(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
self.total_fetch_elapsed.fetch_add(
|
||||
other.total_fetch_elapsed(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RowGroupBase<'a> {
|
||||
metadata: &'a RowGroupMetaData,
|
||||
pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
|
||||
@@ -244,13 +444,14 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
&mut self,
|
||||
projection: &ProjectionMask,
|
||||
selection: Option<&RowSelection>,
|
||||
metrics: Option<&ParquetFetchMetrics>,
|
||||
) -> Result<()> {
|
||||
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
|
||||
let (fetch_ranges, page_start_offsets) =
|
||||
self.base
|
||||
.calc_sparse_read_ranges(projection, offset_index, selection);
|
||||
|
||||
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
|
||||
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
|
||||
// Assign sparse chunk data to base.
|
||||
self.base
|
||||
.assign_sparse_chunk(projection, chunk_data, page_start_offsets);
|
||||
@@ -268,7 +469,7 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
}
|
||||
|
||||
// Fetch data with ranges
|
||||
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
|
||||
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
|
||||
|
||||
// Assigns fetched data to base.
|
||||
self.base.assign_dense_chunk(projection, chunk_data);
|
||||
@@ -279,31 +480,62 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
|
||||
/// Try to fetch data from the memory cache or the WriteCache,
|
||||
/// if not in WriteCache, fetch data from object store directly.
|
||||
async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
async fn fetch_bytes(
|
||||
&self,
|
||||
ranges: &[Range<u64>],
|
||||
metrics: Option<&ParquetFetchMetrics>,
|
||||
) -> Result<Vec<Bytes>> {
|
||||
// Now fetch page timer includes the whole time to read pages.
|
||||
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
|
||||
|
||||
// Track pages to fetch regardless of cache hit/miss.
|
||||
if let Some(metrics) = metrics {
|
||||
metrics.add_pages_to_fetch(ranges.len());
|
||||
}
|
||||
|
||||
let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
|
||||
if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
|
||||
if let Some(metrics) = metrics {
|
||||
metrics.inc_page_cache_hit();
|
||||
}
|
||||
return Ok(pages.compressed.clone());
|
||||
}
|
||||
|
||||
// Calculate total range size for metrics.
|
||||
let total_range_size = compute_total_range_size(ranges);
|
||||
if let Some(metrics) = metrics {
|
||||
metrics.add_page_size_to_fetch(total_range_size);
|
||||
}
|
||||
|
||||
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
|
||||
let pages = match self.fetch_ranges_from_write_cache(key, ranges).await {
|
||||
Some(data) => data,
|
||||
let start = std::time::Instant::now();
|
||||
let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
|
||||
let pages = match write_cache_result {
|
||||
Some(data) => {
|
||||
if let Some(metrics) = metrics {
|
||||
metrics.add_write_cache_fetch_elapsed(start.elapsed().as_micros() as u64);
|
||||
metrics.inc_write_cache_hit();
|
||||
}
|
||||
data
|
||||
}
|
||||
None => {
|
||||
// Fetch data from object store.
|
||||
let _timer = READ_STAGE_ELAPSED
|
||||
.with_label_values(&["cache_miss_read"])
|
||||
.start_timer();
|
||||
|
||||
fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
|
||||
let start = std::time::Instant::now();
|
||||
let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
|
||||
.await
|
||||
.map_err(|e| ParquetError::External(Box::new(e)))?
|
||||
.map_err(|e| ParquetError::External(Box::new(e)))?;
|
||||
if let Some(metrics) = metrics {
|
||||
metrics.add_store_fetch_elapsed(start.elapsed().as_micros() as u64);
|
||||
}
|
||||
data
|
||||
}
|
||||
};
|
||||
|
||||
// Put pages back to the cache.
|
||||
let total_range_size = compute_total_range_size(ranges);
|
||||
let page_value = PageValue::new(pages.clone(), total_range_size);
|
||||
self.cache_strategy
|
||||
.put_pages(page_key, Arc::new(page_value));
|
||||
|
||||
@@ -32,6 +32,15 @@ use crate::blob_metadata::{BlobMetadata, CompressionCodec};
|
||||
use crate::error::Result;
|
||||
use crate::file_metadata::FileMetadata;
|
||||
|
||||
/// Metrics returned by `PuffinReader::dir` operations.
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub struct DirMetrics {
|
||||
/// Whether this was a cache hit (true) or cache miss (false).
|
||||
pub cache_hit: bool,
|
||||
/// Size of the directory in bytes.
|
||||
pub dir_size: u64,
|
||||
}
|
||||
|
||||
/// The `PuffinManager` trait provides a unified interface for creating `PuffinReader` and `PuffinWriter`.
|
||||
#[async_trait]
|
||||
pub trait PuffinManager {
|
||||
@@ -106,9 +115,10 @@ pub trait PuffinReader {
|
||||
|
||||
/// Reads a directory from the Puffin file.
|
||||
///
|
||||
/// The returned `GuardWithMetadata` is used to access the directory data and its metadata.
|
||||
/// The returned tuple contains `GuardWithMetadata` and `DirMetrics`.
|
||||
/// The `GuardWithMetadata` is used to access the directory data and its metadata.
|
||||
/// Users should hold the `GuardWithMetadata` until they are done with the directory data.
|
||||
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>>;
|
||||
async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)>;
|
||||
}
|
||||
|
||||
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::puffin_manager::file_accessor::PuffinFileAccessor;
|
||||
use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef;
|
||||
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
|
||||
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
|
||||
use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader};
|
||||
use crate::puffin_manager::{BlobGuard, DirMetrics, GuardWithMetadata, PuffinReader};
|
||||
|
||||
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
|
||||
pub struct FsPuffinReader<S, F>
|
||||
@@ -130,10 +130,10 @@ where
|
||||
Ok(GuardWithMetadata::new(blob, blob_metadata))
|
||||
}
|
||||
|
||||
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>> {
|
||||
async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)> {
|
||||
let mut file = self.puffin_reader().await?;
|
||||
let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
|
||||
let dir = self
|
||||
let (dir, metrics) = self
|
||||
.stager
|
||||
.get_dir(
|
||||
&self.handle,
|
||||
@@ -153,7 +153,7 @@ where
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(GuardWithMetadata::new(dir, blob_metadata))
|
||||
Ok((GuardWithMetadata::new(dir, blob_metadata), metrics))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ use futures::AsyncWrite;
|
||||
use futures::future::BoxFuture;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::puffin_manager::{BlobGuard, DirGuard};
|
||||
use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
|
||||
|
||||
pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
|
||||
|
||||
@@ -72,14 +72,15 @@ pub trait Stager: Send + Sync {
|
||||
|
||||
/// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
|
||||
///
|
||||
/// The returned `DirGuard` is used to access the directory in the filesystem.
|
||||
/// The returned tuple contains the `DirGuard` and `DirMetrics`.
|
||||
/// The `DirGuard` is used to access the directory in the filesystem.
|
||||
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
|
||||
async fn get_dir<'a>(
|
||||
&self,
|
||||
handle: &Self::FileHandle,
|
||||
key: &str,
|
||||
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
|
||||
) -> Result<Self::Dir>;
|
||||
) -> Result<(Self::Dir, DirMetrics)>;
|
||||
|
||||
/// Stores a directory in the staging area.
|
||||
async fn put_dir(
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::error::{
|
||||
use crate::puffin_manager::stager::{
|
||||
BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
|
||||
};
|
||||
use crate::puffin_manager::{BlobGuard, DirGuard};
|
||||
use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
|
||||
|
||||
const DELETE_QUEUE_SIZE: usize = 10240;
|
||||
const TMP_EXTENSION: &str = "tmp";
|
||||
@@ -203,7 +203,7 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
|
||||
handle: &Self::FileHandle,
|
||||
key: &str,
|
||||
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
|
||||
) -> Result<Self::Dir> {
|
||||
) -> Result<(Self::Dir, DirMetrics)> {
|
||||
let handle_str = handle.to_string();
|
||||
|
||||
let cache_key = Self::encode_cache_key(&handle_str, key);
|
||||
@@ -242,15 +242,22 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
|
||||
.await
|
||||
.context(CacheGetSnafu)?;
|
||||
|
||||
let dir_size = v.size();
|
||||
if let Some(notifier) = self.notifier.as_ref() {
|
||||
if miss {
|
||||
notifier.on_cache_miss(v.size());
|
||||
notifier.on_cache_miss(dir_size);
|
||||
} else {
|
||||
notifier.on_cache_hit(v.size());
|
||||
notifier.on_cache_hit(dir_size);
|
||||
}
|
||||
}
|
||||
|
||||
let metrics = DirMetrics {
|
||||
cache_hit: !miss,
|
||||
dir_size,
|
||||
};
|
||||
|
||||
match v {
|
||||
CacheValue::Dir(guard) => Ok(guard),
|
||||
CacheValue::Dir(guard) => Ok((guard, metrics)),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -882,7 +889,7 @@ mod tests {
|
||||
|
||||
let puffin_file_name = "test_get_dir".to_string();
|
||||
let key = "key";
|
||||
let dir_path = stager
|
||||
let (dir_path, metrics) = stager
|
||||
.get_dir(
|
||||
&puffin_file_name,
|
||||
key,
|
||||
@@ -901,6 +908,9 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(!metrics.cache_hit);
|
||||
assert!(metrics.dir_size > 0);
|
||||
|
||||
for (rel_path, content) in &files_in_dir {
|
||||
let file_path = dir_path.path().join(rel_path);
|
||||
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
|
||||
@@ -974,7 +984,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let dir_key = "dir_key";
|
||||
let guard = stager
|
||||
let (guard, _metrics) = stager
|
||||
.get_dir(
|
||||
&puffin_file_name,
|
||||
dir_key,
|
||||
@@ -1016,7 +1026,7 @@ mod tests {
|
||||
let buf = reader.read(0..m.content_length).await.unwrap();
|
||||
assert_eq!(&*buf, b"hello world");
|
||||
|
||||
let dir_path = stager
|
||||
let (dir_path, metrics) = stager
|
||||
.get_dir(
|
||||
&puffin_file_name,
|
||||
dir_key,
|
||||
@@ -1024,6 +1034,9 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(metrics.cache_hit);
|
||||
assert!(metrics.dir_size > 0);
|
||||
for (rel_path, content) in &files_in_dir {
|
||||
let file_path = dir_path.path().join(rel_path);
|
||||
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
|
||||
@@ -1151,7 +1164,7 @@ mod tests {
|
||||
];
|
||||
|
||||
// First time to get the directory
|
||||
let guard_0 = stager
|
||||
let (guard_0, _metrics) = stager
|
||||
.get_dir(
|
||||
&puffin_file_name,
|
||||
dir_key,
|
||||
@@ -1198,7 +1211,7 @@ mod tests {
|
||||
);
|
||||
|
||||
// Second time to get the directory
|
||||
let guard_1 = stager
|
||||
let (guard_1, _metrics) = stager
|
||||
.get_dir(
|
||||
&puffin_file_name,
|
||||
dir_key,
|
||||
@@ -1237,7 +1250,7 @@ mod tests {
|
||||
// Third time to get the directory and all guards are dropped
|
||||
drop(guard_0);
|
||||
drop(guard_1);
|
||||
let guard_2 = stager
|
||||
let (guard_2, _metrics) = stager
|
||||
.get_dir(
|
||||
&puffin_file_name,
|
||||
dir_key,
|
||||
@@ -1390,7 +1403,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let dir_key = "dir_key";
|
||||
let guard = stager
|
||||
let (guard, _metrics) = stager
|
||||
.get_dir(
|
||||
&puffin_file_name,
|
||||
dir_key,
|
||||
|
||||
@@ -356,7 +356,7 @@ async fn check_dir(
|
||||
stager: &BoundedStager<String>,
|
||||
puffin_reader: &impl PuffinReader,
|
||||
) {
|
||||
let res_dir = puffin_reader.dir(key).await.unwrap();
|
||||
let (res_dir, _metrics) = puffin_reader.dir(key).await.unwrap();
|
||||
let metadata = res_dir.metadata();
|
||||
assert_eq!(
|
||||
metadata.properties,
|
||||
|
||||
Reference in New Issue
Block a user