mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: add metrics for range_read and metadata
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -38,7 +38,7 @@ pub struct BloomFilterApplier {
|
||||
|
||||
impl BloomFilterApplier {
|
||||
pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
|
||||
let meta = reader.metadata().await?;
|
||||
let meta = reader.metadata(None).await?;
|
||||
|
||||
Ok(Self { reader, meta })
|
||||
}
|
||||
|
||||
@@ -91,7 +91,12 @@ pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec<u64> {
|
||||
#[async_trait]
|
||||
pub trait BloomFilterReader: Sync {
|
||||
/// Reads range of bytes from the file.
|
||||
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>;
|
||||
async fn range_read(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Bytes>;
|
||||
|
||||
/// Reads bunch of ranges from the file.
|
||||
async fn read_vec(
|
||||
@@ -104,7 +109,7 @@ pub trait BloomFilterReader: Sync {
|
||||
let mut results = Vec::with_capacity(ranges.len());
|
||||
for range in ranges {
|
||||
let size = (range.end - range.start) as u32;
|
||||
let data = self.range_read(range.start, size).await?;
|
||||
let data = self.range_read(range.start, size, None).await?;
|
||||
results.push(data);
|
||||
}
|
||||
|
||||
@@ -120,11 +125,18 @@ pub trait BloomFilterReader: Sync {
|
||||
}
|
||||
|
||||
/// Reads the meta information of the bloom filter.
|
||||
async fn metadata(&self) -> Result<BloomFilterMeta>;
|
||||
async fn metadata(
|
||||
&self,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilterMeta>;
|
||||
|
||||
/// Reads a bloom filter with the given location.
|
||||
async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> {
|
||||
let bytes = self.range_read(loc.offset, loc.size as _).await?;
|
||||
async fn bloom_filter(
|
||||
&self,
|
||||
loc: &BloomFilterLoc,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilter> {
|
||||
let bytes = self.range_read(loc.offset, loc.size as _, metrics).await?;
|
||||
let vec = bytes_to_u64_vec(&bytes);
|
||||
let bm = BloomFilter::from_vec(vec)
|
||||
.seed(&SEED)
|
||||
@@ -171,11 +183,28 @@ impl<R: RangeReader> BloomFilterReaderImpl<R> {
|
||||
|
||||
#[async_trait]
|
||||
impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
|
||||
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
|
||||
self.reader
|
||||
async fn range_read(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Bytes> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
let result = self
|
||||
.reader
|
||||
.read(offset..offset + size as u64)
|
||||
.await
|
||||
.context(IoSnafu)
|
||||
.context(IoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += 1;
|
||||
m.total_bytes += size as u64;
|
||||
if let Some(start) = start {
|
||||
m.elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn read_vec(
|
||||
@@ -197,13 +226,16 @@ impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn metadata(&self) -> Result<BloomFilterMeta> {
|
||||
async fn metadata(
|
||||
&self,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilterMeta> {
|
||||
let metadata = self.reader.metadata().await.context(IoSnafu)?;
|
||||
let file_size = metadata.content_length;
|
||||
|
||||
let mut meta_reader =
|
||||
BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
|
||||
meta_reader.metadata().await
|
||||
meta_reader.metadata(metrics).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,7 +261,10 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
|
||||
///
|
||||
/// It will first prefetch some bytes from the end of the file,
|
||||
/// then parse the metadata from the prefetch bytes.
|
||||
pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
|
||||
pub async fn metadata(
|
||||
&mut self,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilterMeta> {
|
||||
ensure!(
|
||||
self.file_size >= BLOOM_META_LEN_SIZE,
|
||||
FileSizeTooSmallSnafu {
|
||||
@@ -237,6 +272,7 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
|
||||
}
|
||||
);
|
||||
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
let meta_start = self.file_size.saturating_sub(self.prefetch_size);
|
||||
let suffix = self
|
||||
.reader
|
||||
@@ -254,8 +290,25 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
|
||||
.read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
|
||||
.await
|
||||
.context(IoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += 2; // suffix read + meta read
|
||||
m.total_bytes += (self.file_size - meta_start) + length;
|
||||
if let Some(start) = start {
|
||||
m.elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
|
||||
} else {
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += 1; // suffix read only
|
||||
m.total_bytes += self.file_size.min(self.prefetch_size);
|
||||
if let Some(start) = start {
|
||||
m.elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
|
||||
let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
|
||||
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
|
||||
@@ -336,7 +389,7 @@ mod tests {
|
||||
for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
|
||||
let mut reader =
|
||||
BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
|
||||
let meta = reader.metadata().await.unwrap();
|
||||
let meta = reader.metadata(None).await.unwrap();
|
||||
|
||||
assert_eq!(meta.rows_per_segment, 2);
|
||||
assert_eq!(meta.segment_count, 2);
|
||||
@@ -358,11 +411,11 @@ mod tests {
|
||||
let bytes = mock_bloom_filter_bytes().await;
|
||||
|
||||
let reader = BloomFilterReaderImpl::new(bytes);
|
||||
let meta = reader.metadata().await.unwrap();
|
||||
let meta = reader.metadata(None).await.unwrap();
|
||||
|
||||
assert_eq!(meta.bloom_filter_locs.len(), 2);
|
||||
let bf = reader
|
||||
.bloom_filter(&meta.bloom_filter_locs[0])
|
||||
.bloom_filter(&meta.bloom_filter_locs[0], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(bf.contains(&b"a"));
|
||||
@@ -371,7 +424,7 @@ mod tests {
|
||||
assert!(bf.contains(&b"d"));
|
||||
|
||||
let bf = reader
|
||||
.bloom_filter(&meta.bloom_filter_locs[1])
|
||||
.bloom_filter(&meta.bloom_filter_locs[1], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(bf.contains(&b"e"));
|
||||
|
||||
30
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
30
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
@@ -115,9 +115,16 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
|
||||
|
||||
#[async_trait]
|
||||
impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
|
||||
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
|
||||
async fn range_read(
|
||||
&self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<Bytes> {
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
let inner = &self.inner;
|
||||
self.cache
|
||||
let result = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
self.blob_size,
|
||||
@@ -126,7 +133,17 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
move |ranges| async move { inner.read_vec(&ranges, None).await },
|
||||
)
|
||||
.await
|
||||
.map(|b| b.into())
|
||||
.map(|b| b.into())?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += 1;
|
||||
m.total_bytes += size as u64;
|
||||
if let Some(start) = start {
|
||||
m.elapsed += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn read_vec(
|
||||
@@ -165,7 +182,10 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
}
|
||||
|
||||
/// Reads the meta information of the bloom filter.
|
||||
async fn metadata(&self) -> Result<BloomFilterMeta> {
|
||||
async fn metadata(
|
||||
&self,
|
||||
metrics: Option<&mut BloomFilterReadMetrics>,
|
||||
) -> Result<BloomFilterMeta> {
|
||||
if let Some(cached) = self
|
||||
.cache
|
||||
.get_metadata((self.file_id, self.column_id, self.tag))
|
||||
@@ -173,7 +193,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok((*cached).clone())
|
||||
} else {
|
||||
let meta = self.inner.metadata().await?;
|
||||
let meta = self.inner.metadata(metrics).await?;
|
||||
self.cache.put_metadata(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
Arc::new(meta.clone()),
|
||||
|
||||
@@ -318,10 +318,10 @@ async fn try_read_bloom_meta(
|
||||
bloom_reader,
|
||||
cache.clone(),
|
||||
)
|
||||
.metadata()
|
||||
.metadata(None)
|
||||
.await
|
||||
}
|
||||
_ => bloom_reader.metadata().await,
|
||||
_ => bloom_reader.metadata(None).await,
|
||||
};
|
||||
|
||||
match result {
|
||||
|
||||
@@ -637,17 +637,17 @@ pub(crate) mod tests {
|
||||
.unwrap();
|
||||
let reader = blob_guard.reader().await.unwrap();
|
||||
let bloom_filter = BloomFilterReaderImpl::new(reader);
|
||||
let metadata = bloom_filter.metadata().await.unwrap();
|
||||
let metadata = bloom_filter.metadata(None).await.unwrap();
|
||||
|
||||
assert_eq!(metadata.segment_count, 10);
|
||||
for i in 0..5 {
|
||||
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
|
||||
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
|
||||
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
|
||||
assert!(bf.contains(b"tag1"));
|
||||
}
|
||||
for i in 5..10 {
|
||||
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
|
||||
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
|
||||
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
|
||||
assert!(bf.contains(b"tag2"));
|
||||
}
|
||||
}
|
||||
@@ -662,13 +662,13 @@ pub(crate) mod tests {
|
||||
.unwrap();
|
||||
let reader = blob_guard.reader().await.unwrap();
|
||||
let bloom_filter = BloomFilterReaderImpl::new(reader);
|
||||
let metadata = bloom_filter.metadata().await.unwrap();
|
||||
let metadata = bloom_filter.metadata(None).await.unwrap();
|
||||
|
||||
assert_eq!(metadata.segment_count, 5);
|
||||
for i in 0u64..20 {
|
||||
let idx = i as usize / 4;
|
||||
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
|
||||
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
|
||||
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
|
||||
let mut buf = vec![];
|
||||
IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user