mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
feat: collect per file metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Utilities for scanners.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -42,7 +42,7 @@ use crate::read::merge::{MergeMetrics, MergeMetricsReport};
|
||||
use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
|
||||
use crate::read::scan_region::StreamContext;
|
||||
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
use crate::sst::file::{FileTimeRange, RegionFileId};
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
|
||||
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
|
||||
@@ -52,6 +52,39 @@ use crate::sst::parquet::flat_format::time_index_column_index;
|
||||
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
|
||||
use crate::sst::parquet::row_group::ParquetFetchMetrics;
|
||||
|
||||
/// Per-file scan metrics tracked when explain_verbose is enabled.
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct FileMetrics {
|
||||
/// Number of ranges (row groups) read from this file.
|
||||
pub(crate) num_ranges: usize,
|
||||
/// Number of rows read from this file.
|
||||
pub(crate) num_rows: usize,
|
||||
/// Time spent building readers for this file (accumulated across all ranges).
|
||||
pub(crate) build_cost: Duration,
|
||||
/// Time spent scanning this file (accumulated across all ranges).
|
||||
pub(crate) scan_cost: Duration,
|
||||
}
|
||||
|
||||
impl fmt::Debug for FileMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{{\"num_ranges\":{}, \"num_rows\":{}, \"build_cost\":\"{:?}\", \"scan_cost\":\"{:?}\"}}",
|
||||
self.num_ranges, self.num_rows, self.build_cost, self.scan_cost
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl FileMetrics {
|
||||
/// Merges another FileMetrics into this one.
|
||||
pub(crate) fn merge_from(&mut self, other: &FileMetrics) {
|
||||
self.num_ranges += other.num_ranges;
|
||||
self.num_rows += other.num_rows;
|
||||
self.build_cost += other.build_cost;
|
||||
self.scan_cost += other.scan_cost;
|
||||
}
|
||||
}
|
||||
|
||||
/// Verbose scan metrics for a partition.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct ScanMetricsSet {
|
||||
@@ -151,6 +184,8 @@ pub(crate) struct ScanMetricsSet {
|
||||
fetch_metrics: Option<ParquetFetchMetrics>,
|
||||
/// Metadata cache metrics.
|
||||
metadata_cache_metrics: Option<MetadataCacheMetrics>,
|
||||
/// Per-file scan metrics, only populated when explain_verbose is true.
|
||||
per_file_metrics: Option<HashMap<RegionFileId, FileMetrics>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for ScanMetricsSet {
|
||||
@@ -199,6 +234,7 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
fulltext_index_apply_metrics,
|
||||
fetch_metrics,
|
||||
metadata_cache_metrics,
|
||||
per_file_metrics,
|
||||
} = self;
|
||||
|
||||
// Write core metrics
|
||||
@@ -326,6 +362,20 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
|
||||
}
|
||||
|
||||
// Write per-file metrics if present and non-empty
|
||||
if let Some(file_metrics) = per_file_metrics
|
||||
&& !file_metrics.is_empty()
|
||||
{
|
||||
write!(f, ", \"per_file_metrics\": {{")?;
|
||||
for (i, (file_id, metrics)) in file_metrics.iter().enumerate() {
|
||||
if i > 0 {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"{}\": {:?}", file_id, metrics)?;
|
||||
}
|
||||
write!(f, "}}")?;
|
||||
}
|
||||
|
||||
write!(f, ", \"stream_eof\":{stream_eof}}}")
|
||||
}
|
||||
}
|
||||
@@ -432,6 +482,17 @@ impl ScanMetricsSet {
|
||||
.merge_from(metadata_cache_metrics);
|
||||
}
|
||||
|
||||
/// Merges per-file metrics.
|
||||
fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileMetrics>) {
|
||||
let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
|
||||
for (file_id, metrics) in other {
|
||||
self_file_metrics
|
||||
.entry(*file_id)
|
||||
.or_insert_with(FileMetrics::default)
|
||||
.merge_from(metrics);
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets distributor metrics.
|
||||
fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
|
||||
let SeriesDistributorMetrics {
|
||||
@@ -722,11 +783,20 @@ impl PartitionMetrics {
|
||||
}
|
||||
|
||||
/// Merges [ReaderMetrics] and `build_reader_cost`.
|
||||
pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
|
||||
pub fn merge_reader_metrics(
|
||||
&self,
|
||||
metrics: &ReaderMetrics,
|
||||
per_file_metrics: Option<&HashMap<RegionFileId, FileMetrics>>,
|
||||
) {
|
||||
self.0.build_parts_cost.add_duration(metrics.build_cost);
|
||||
|
||||
let mut metrics_set = self.0.metrics.lock().unwrap();
|
||||
metrics_set.merge_reader_metrics(metrics);
|
||||
|
||||
// Merge per-file metrics if provided
|
||||
if let Some(file_metrics) = per_file_metrics {
|
||||
metrics_set.merge_per_file_metrics(file_metrics);
|
||||
}
|
||||
}
|
||||
|
||||
/// Finishes the query.
|
||||
@@ -938,7 +1008,7 @@ pub(crate) async fn scan_file_ranges(
|
||||
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
|
||||
.await?;
|
||||
part_metrics.inc_num_file_ranges(ranges.len());
|
||||
part_metrics.merge_reader_metrics(&reader_metrics);
|
||||
part_metrics.merge_reader_metrics(&reader_metrics, None);
|
||||
|
||||
Ok(build_file_range_scan_stream(
|
||||
stream_ctx,
|
||||
@@ -964,7 +1034,7 @@ pub(crate) async fn scan_flat_file_ranges(
|
||||
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
|
||||
.await?;
|
||||
part_metrics.inc_num_file_ranges(ranges.len());
|
||||
part_metrics.merge_reader_metrics(&reader_metrics);
|
||||
part_metrics.merge_reader_metrics(&reader_metrics, None);
|
||||
|
||||
Ok(build_flat_file_range_scan_stream(
|
||||
stream_ctx,
|
||||
@@ -987,6 +1057,11 @@ pub fn build_file_range_scan_stream(
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut per_file_metrics = if part_metrics.explain_verbose() {
|
||||
Some(HashMap::<RegionFileId, FileMetrics>::new())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let reader_metrics = &mut ReaderMetrics {
|
||||
fetch_metrics: fetch_metrics.clone(),
|
||||
..Default::default()
|
||||
@@ -1006,6 +1081,20 @@ pub fn build_file_range_scan_stream(
|
||||
}
|
||||
if let Source::PruneReader(reader) = source {
|
||||
let prune_metrics = reader.metrics();
|
||||
|
||||
// Update per-file metrics if tracking is enabled
|
||||
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
|
||||
let file_id = range.file_handle().file_id();
|
||||
let file_metrics = file_metrics_map
|
||||
.entry(file_id)
|
||||
.or_insert_with(FileMetrics::default);
|
||||
|
||||
file_metrics.num_ranges += 1;
|
||||
file_metrics.num_rows += prune_metrics.num_rows;
|
||||
file_metrics.build_cost += build_cost;
|
||||
file_metrics.scan_cost += prune_metrics.scan_cost;
|
||||
}
|
||||
|
||||
reader_metrics.merge_from(&prune_metrics);
|
||||
}
|
||||
}
|
||||
@@ -1013,7 +1102,7 @@ pub fn build_file_range_scan_stream(
|
||||
// Reports metrics.
|
||||
reader_metrics.observe_rows(read_type);
|
||||
reader_metrics.filter_metrics.observe();
|
||||
part_metrics.merge_reader_metrics(reader_metrics);
|
||||
part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1030,6 +1119,11 @@ pub fn build_flat_file_range_scan_stream(
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut per_file_metrics = if part_metrics.explain_verbose() {
|
||||
Some(HashMap::<RegionFileId, FileMetrics>::new())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let reader_metrics = &mut ReaderMetrics {
|
||||
fetch_metrics: fetch_metrics.clone(),
|
||||
..Default::default()
|
||||
@@ -1058,13 +1152,27 @@ pub fn build_flat_file_range_scan_stream(
|
||||
}
|
||||
|
||||
let prune_metrics = reader.metrics();
|
||||
|
||||
// Update per-file metrics if tracking is enabled
|
||||
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
|
||||
let file_id = range.file_handle().file_id();
|
||||
let file_metrics = file_metrics_map
|
||||
.entry(file_id)
|
||||
.or_insert_with(FileMetrics::default);
|
||||
|
||||
file_metrics.num_ranges += 1;
|
||||
file_metrics.num_rows += prune_metrics.num_rows;
|
||||
file_metrics.build_cost += build_cost;
|
||||
file_metrics.scan_cost += prune_metrics.scan_cost;
|
||||
}
|
||||
|
||||
reader_metrics.merge_from(&prune_metrics);
|
||||
}
|
||||
|
||||
// Reports metrics.
|
||||
reader_metrics.observe_rows(read_type);
|
||||
reader_metrics.filter_metrics.observe();
|
||||
part_metrics.merge_reader_metrics(reader_metrics);
|
||||
part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user