diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 4a22445aef..2982a065c5 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -47,7 +47,7 @@ use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics; use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics; use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics; -use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; +use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::row_group::ParquetFetchMetrics; /// Verbose scan metrics for a partition. @@ -130,6 +130,18 @@ pub(crate) struct ScanMetricsSet { /// The stream reached EOF stream_eof: bool, + + // Optional verbose metrics: + /// Inverted index apply metrics. + inverted_index_apply_metrics: Option, + /// Bloom filter index apply metrics. + bloom_filter_apply_metrics: Option, + /// Fulltext index apply metrics. + fulltext_index_apply_metrics: Option, + /// Parquet fetch metrics. + fetch_metrics: Option, + /// Metadata cache metrics. + metadata_cache_metrics: Option, } impl fmt::Debug for ScanMetricsSet { @@ -170,6 +182,11 @@ impl fmt::Debug for ScanMetricsSet { mem_rows, mem_batches, mem_series, + inverted_index_apply_metrics, + bloom_filter_apply_metrics, + fulltext_index_apply_metrics, + fetch_metrics, + metadata_cache_metrics, } = self; // Write core metrics @@ -259,6 +276,23 @@ impl fmt::Debug for ScanMetricsSet { write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?; } + // Write optional verbose metrics + if let Some(metrics) = inverted_index_apply_metrics { + write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?; + } + if let Some(metrics) = bloom_filter_apply_metrics { + write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?; + } + if let Some(metrics) = fulltext_index_apply_metrics { + write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?; + } + if let Some(metrics) = fetch_metrics { + write!(f, ", \"fetch_metrics\":{:?}", metrics)?; + } + if let Some(metrics) = metadata_cache_metrics { + write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?; + } + write!(f, ", \"stream_eof\":{stream_eof}}}") } } @@ -308,17 +342,16 @@ impl ScanMetricsSet { rows_inverted_filtered, rows_bloom_filtered, rows_precise_filtered, - // Optional applier metrics are not collected here. - inverted_index_apply_metrics: _, - bloom_filter_apply_metrics: _, - fulltext_index_apply_metrics: _, + inverted_index_apply_metrics, + bloom_filter_apply_metrics, + fulltext_index_apply_metrics, }, num_record_batches, num_batches, num_rows, scan_cost: _, - metadata_cache_metrics: _, - fetch_metrics: _, + metadata_cache_metrics, + fetch_metrics, } = other; self.build_parts_cost += *build_cost; @@ -338,6 +371,31 @@ impl ScanMetricsSet { self.num_sst_record_batches += *num_record_batches; self.num_sst_batches += *num_batches; self.num_sst_rows += *num_rows; + + // Merge optional verbose metrics + if let Some(metrics) = inverted_index_apply_metrics { + self.inverted_index_apply_metrics + .get_or_insert_with(InvertedIndexApplyMetrics::default) + .merge_from(metrics); + } + if let Some(metrics) = bloom_filter_apply_metrics { + self.bloom_filter_apply_metrics + .get_or_insert_with(BloomFilterIndexApplyMetrics::default) + .merge_from(metrics); + } + if let Some(metrics) = fulltext_index_apply_metrics { + self.fulltext_index_apply_metrics + .get_or_insert_with(FulltextIndexApplyMetrics::default) + .merge_from(metrics); + } + if let Some(metrics) = fetch_metrics { + self.fetch_metrics + .get_or_insert_with(ParquetFetchMetrics::default) + .merge_from(metrics); + } + self.metadata_cache_metrics + .get_or_insert_with(MetadataCacheMetrics::default) + .merge_from(metadata_cache_metrics); } /// Sets distributor metrics.