mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 17:00:37 +00:00
feat: collect metadata fetch metrics for inverted index
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -292,8 +292,10 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
|
||||
.context(IoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += 2; // suffix read + meta read
|
||||
m.total_bytes += (self.file_size - meta_start) + length;
|
||||
// suffix read + meta read
|
||||
m.total_ranges += 2;
|
||||
// Ignores the meta length size to simplify the calculation.
|
||||
m.total_bytes += self.file_size.min(self.prefetch_size) + length;
|
||||
if let Some(start) = start {
|
||||
m.fetch_elapsed += start.elapsed();
|
||||
}
|
||||
@@ -302,7 +304,8 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
|
||||
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
|
||||
} else {
|
||||
if let Some(m) = metrics {
|
||||
m.total_ranges += 1; // suffix read only
|
||||
// suffix read only
|
||||
m.total_ranges += 1;
|
||||
m.total_bytes += self.file_size.min(self.prefetch_size);
|
||||
if let Some(start) = start {
|
||||
m.fetch_elapsed += start.elapsed();
|
||||
|
||||
@@ -75,7 +75,10 @@ pub trait InvertedIndexReader: Send + Sync {
|
||||
}
|
||||
|
||||
/// Retrieves metadata of all inverted indices stored within the blob.
|
||||
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>>;
|
||||
async fn metadata<'a>(
|
||||
&self,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Arc<InvertedIndexMetas>>;
|
||||
|
||||
/// Retrieves the finite state transducer (FST) map from the given offset and size.
|
||||
async fn fst<'a>(
|
||||
|
||||
@@ -95,14 +95,17 @@ impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
|
||||
async fn metadata<'a>(
|
||||
&self,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Arc<InvertedIndexMetas>> {
|
||||
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
|
||||
let blob_size = metadata.content_length;
|
||||
Self::validate_blob_size(blob_size)?;
|
||||
|
||||
let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
|
||||
.with_prefetch_size(DEFAULT_PREFETCH_SIZE);
|
||||
footer_reader.metadata().await.map(Arc::new)
|
||||
footer_reader.metadata(metrics).await.map(Arc::new)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,7 +205,7 @@ mod tests {
|
||||
let blob = create_inverted_index_blob();
|
||||
let blob_reader = InvertedIndexBlobReader::new(blob);
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let metas = blob_reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metas.metas.len(), 2);
|
||||
|
||||
let meta0 = metas.metas.get("tag0").unwrap();
|
||||
@@ -229,7 +232,7 @@ mod tests {
|
||||
let blob = create_inverted_index_blob();
|
||||
let blob_reader = InvertedIndexBlobReader::new(blob);
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let metas = blob_reader.metadata(None).await.unwrap();
|
||||
let meta = metas.metas.get("tag0").unwrap();
|
||||
|
||||
let fst_map = blob_reader
|
||||
@@ -263,7 +266,7 @@ mod tests {
|
||||
let blob = create_inverted_index_blob();
|
||||
let blob_reader = InvertedIndexBlobReader::new(blob);
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let metas = blob_reader.metadata(None).await.unwrap();
|
||||
let meta = metas.metas.get("tag0").unwrap();
|
||||
|
||||
let bitmap = blob_reader
|
||||
@@ -277,7 +280,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, mock_bitmap());
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let metas = blob_reader.metadata(None).await.unwrap();
|
||||
let meta = metas.metas.get("tag1").unwrap();
|
||||
|
||||
let bitmap = blob_reader
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
|
||||
use prost::Message;
|
||||
@@ -23,6 +25,7 @@ use crate::inverted_index::error::{
|
||||
UnexpectedZeroSegmentRowCountSnafu,
|
||||
};
|
||||
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
use crate::inverted_index::format::reader::InvertedIndexReadMetrics;
|
||||
|
||||
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
|
||||
|
||||
@@ -54,12 +57,17 @@ impl<R> InvertedIndexFooterReader<R> {
|
||||
}
|
||||
|
||||
impl<R: RangeReader> InvertedIndexFooterReader<R> {
|
||||
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
|
||||
pub async fn metadata<'a>(
|
||||
&mut self,
|
||||
mut metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<InvertedIndexMetas> {
|
||||
ensure!(
|
||||
self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
|
||||
BlobSizeTooSmallSnafu
|
||||
);
|
||||
|
||||
let start = metrics.as_ref().map(|_| Instant::now());
|
||||
|
||||
let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
|
||||
let suffix = self
|
||||
.source
|
||||
@@ -73,19 +81,36 @@ impl<R: RangeReader> InvertedIndexFooterReader<R> {
|
||||
let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;
|
||||
|
||||
// Did not fetch the entire file metadata in the initial read, need to make a second request.
|
||||
if length > suffix_len as u64 - footer_size {
|
||||
let result = if length > suffix_len as u64 - footer_size {
|
||||
let metadata_start = self.blob_size - length - footer_size;
|
||||
let meta = self
|
||||
.source
|
||||
.read(metadata_start..self.blob_size - footer_size)
|
||||
.await
|
||||
.context(CommonIoSnafu)?;
|
||||
|
||||
if let Some(m) = metrics.as_deref_mut() {
|
||||
m.total_bytes += self.blob_size.min(self.prefetch_size()) + length;
|
||||
m.total_ranges += 2;
|
||||
}
|
||||
|
||||
self.parse_payload(&meta, length)
|
||||
} else {
|
||||
if let Some(m) = metrics.as_deref_mut() {
|
||||
m.total_bytes += self.blob_size.min(self.prefetch_size());
|
||||
m.total_ranges += 1;
|
||||
}
|
||||
|
||||
let metadata_start = self.blob_size - length - footer_size - footer_start;
|
||||
let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
|
||||
self.parse_payload(meta, length)
|
||||
};
|
||||
|
||||
if let Some(m) = metrics {
|
||||
m.fetch_elapsed += start.unwrap().elapsed();
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
|
||||
@@ -186,7 +211,7 @@ mod tests {
|
||||
reader = reader.with_prefetch_size(prefetch);
|
||||
}
|
||||
|
||||
let metas = reader.metadata().await.unwrap();
|
||||
let metas = reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metas.metas.len(), 1);
|
||||
let index_meta = &metas.metas.get("test").unwrap();
|
||||
assert_eq!(index_meta.name, "test");
|
||||
@@ -210,7 +235,7 @@ mod tests {
|
||||
reader = reader.with_prefetch_size(prefetch);
|
||||
}
|
||||
|
||||
let result = reader.metadata().await;
|
||||
let result = reader.metadata(None).await;
|
||||
assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
|
||||
}
|
||||
}
|
||||
@@ -233,7 +258,7 @@ mod tests {
|
||||
reader = reader.with_prefetch_size(prefetch);
|
||||
}
|
||||
|
||||
let result = reader.metadata().await;
|
||||
let result = reader.metadata(None).await;
|
||||
assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,7 +122,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let reader = InvertedIndexBlobReader::new(blob);
|
||||
let metadata = reader.metadata().await.unwrap();
|
||||
let metadata = reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metadata.total_row_count, 8);
|
||||
assert_eq!(metadata.segment_row_count, 1);
|
||||
assert_eq!(metadata.metas.len(), 0);
|
||||
@@ -182,7 +182,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let reader = InvertedIndexBlobReader::new(blob);
|
||||
let metadata = reader.metadata().await.unwrap();
|
||||
let metadata = reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metadata.total_row_count, 8);
|
||||
assert_eq!(metadata.segment_row_count, 1);
|
||||
assert_eq!(metadata.metas.len(), 2);
|
||||
|
||||
@@ -50,7 +50,7 @@ impl IndexApplier for PredicatesIndexApplier {
|
||||
metrics: Option<&'b mut InvertedIndexReadMetrics>,
|
||||
) -> Result<ApplyOutput> {
|
||||
let mut metrics = metrics;
|
||||
let metadata = reader.metadata().await?;
|
||||
let metadata = reader.metadata(metrics.as_deref_mut()).await?;
|
||||
let mut output = ApplyOutput {
|
||||
matched_segment_ids: Bitmap::new_bitvec(),
|
||||
total_row_count: metadata.total_row_count as _,
|
||||
@@ -225,7 +225,7 @@ mod tests {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas([("tag-0", 0)])));
|
||||
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
|
||||
mock_reader.expect_fst_vec().returning(|_ranges, _metrics| {
|
||||
Ok(vec![
|
||||
FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(),
|
||||
@@ -258,7 +258,7 @@ mod tests {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas([("tag-0", 0)])));
|
||||
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
|
||||
mock_reader.expect_fst_vec().returning(|_range, _metrics| {
|
||||
Ok(vec![
|
||||
FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(),
|
||||
@@ -285,7 +285,7 @@ mod tests {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
|
||||
.returning(|_| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
|
||||
mock_reader.expect_fst_vec().returning(|ranges, _metrics| {
|
||||
let mut output = vec![];
|
||||
for range in ranges {
|
||||
@@ -339,7 +339,7 @@ mod tests {
|
||||
let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas([("tag-0", 0)])));
|
||||
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
|
||||
|
||||
let output = applier
|
||||
.apply(SearchContext::default(), &mut mock_reader, None)
|
||||
@@ -351,7 +351,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_index_applier_with_empty_index() {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader.expect_metadata().returning(move || {
|
||||
mock_reader.expect_metadata().returning(move |_| {
|
||||
Ok(Arc::new(InvertedIndexMetas {
|
||||
total_row_count: 0, // No rows
|
||||
segment_row_count: 1,
|
||||
@@ -378,7 +378,7 @@ mod tests {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader
|
||||
.expect_metadata()
|
||||
.returning(|| Ok(mock_metas(vec![])));
|
||||
.returning(|_| Ok(mock_metas(vec![])));
|
||||
|
||||
let mut mock_fst_applier = MockFstApplier::new();
|
||||
mock_fst_applier.expect_apply().never();
|
||||
|
||||
9
src/mito2/src/cache/index/inverted_index.rs
vendored
9
src/mito2/src/cache/index/inverted_index.rs
vendored
@@ -146,12 +146,15 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
||||
Ok(pages)
|
||||
}
|
||||
|
||||
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
|
||||
async fn metadata<'a>(
|
||||
&self,
|
||||
metrics: Option<&'a mut InvertedIndexReadMetrics>,
|
||||
) -> Result<Arc<InvertedIndexMetas>> {
|
||||
if let Some(cached) = self.cache.get_metadata(self.file_id) {
|
||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok(cached)
|
||||
} else {
|
||||
let meta = self.inner.metadata().await?;
|
||||
let meta = self.inner.metadata(metrics).await?;
|
||||
self.cache.put_metadata(self.file_id, meta.clone());
|
||||
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok(meta)
|
||||
@@ -306,7 +309,7 @@ mod test {
|
||||
reader,
|
||||
Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
|
||||
);
|
||||
let metadata = cached_reader.metadata().await.unwrap();
|
||||
let metadata = cached_reader.metadata(None).await.unwrap();
|
||||
assert_eq!(metadata.total_row_count, 8);
|
||||
assert_eq!(metadata.segment_row_count, 1);
|
||||
assert_eq!(metadata.metas.len(), 2);
|
||||
|
||||
@@ -233,7 +233,7 @@ async fn collect_inverted_entries(
|
||||
InvertedIndexBlobReader::new(blob_reader),
|
||||
cache.clone(),
|
||||
);
|
||||
match reader.metadata().await {
|
||||
match reader.metadata(None).await {
|
||||
Ok(metas) => metas,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
@@ -247,7 +247,7 @@ async fn collect_inverted_entries(
|
||||
}
|
||||
} else {
|
||||
let reader = InvertedIndexBlobReader::new(blob_reader);
|
||||
match reader.metadata().await {
|
||||
match reader.metadata(None).await {
|
||||
Ok(metas) => metas,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
|
||||
Reference in New Issue
Block a user