From 09effc81286a3549dbe485fa3c4a527d09914ef8 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 24 Nov 2025 22:06:06 +0800 Subject: [PATCH] feat: add fetch metrics to ReaderMetrics Signed-off-by: evenyag --- src/mito2/src/read/scan_util.rs | 19 ++++++---- src/mito2/src/sst/parquet/file_range.rs | 4 +-- src/mito2/src/sst/parquet/reader.rs | 15 ++++++-- src/mito2/src/sst/parquet/row_group.rs | 46 ++++++++++++++++++++----- 4 files changed, 65 insertions(+), 19 deletions(-) diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index cc2f567a21..e73d2122a3 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -315,6 +315,7 @@ impl ScanMetricsSet { num_rows, scan_cost: _, metadata_cache_metrics: _, + fetch_metrics: _, } = other; self.build_parts_cost += *build_cost; @@ -828,11 +829,14 @@ pub fn build_file_range_scan_stream( ranges: SmallVec<[FileRange; 2]>, ) -> impl Stream> { try_stream! { - let reader_metrics = &mut ReaderMetrics::default(); - let fetch_metrics = ParquetFetchMetrics::default(); + let fetch_metrics = Arc::new(ParquetFetchMetrics::default()); + let reader_metrics = &mut ReaderMetrics { + fetch_metrics: Some(fetch_metrics.clone()), + ..Default::default() + }; for range in ranges { let build_reader_start = Instant::now(); - let reader = range.reader(stream_ctx.input.series_row_selector, &fetch_metrics).await?; + let reader = range.reader(stream_ctx.input.series_row_selector, Some(&fetch_metrics)).await?; let build_cost = build_reader_start.elapsed(); part_metrics.inc_build_reader_cost(build_cost); let compat_batch = range.compat_batch(); @@ -864,11 +868,14 @@ pub fn build_flat_file_range_scan_stream( ranges: SmallVec<[FileRange; 2]>, ) -> impl Stream> { try_stream! { - let reader_metrics = &mut ReaderMetrics::default(); - let fetch_metrics = ParquetFetchMetrics::default(); + let fetch_metrics = Arc::new(ParquetFetchMetrics::default()); + let reader_metrics = &mut ReaderMetrics { + fetch_metrics: Some(fetch_metrics.clone()), + ..Default::default() + }; for range in ranges { let build_reader_start = Instant::now(); - let mut reader = range.flat_reader(&fetch_metrics).await?; + let mut reader = range.flat_reader(Some(&fetch_metrics)).await?; let build_cost = build_reader_start.elapsed(); part_metrics.inc_build_reader_cost(build_cost); diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index c1c5af7a39..46cd53e6ea 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -118,7 +118,7 @@ impl FileRange { pub(crate) async fn reader( &self, selector: Option, - fetch_metrics: &ParquetFetchMetrics, + fetch_metrics: Option<&ParquetFetchMetrics>, ) -> Result { let parquet_reader = self .context @@ -176,7 +176,7 @@ impl FileRange { /// Creates a flat reader that returns RecordBatch. pub(crate) async fn flat_reader( &self, - fetch_metrics: &ParquetFetchMetrics, + fetch_metrics: Option<&ParquetFetchMetrics>, ) -> Result { let parquet_reader = self .context diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 2f4d204837..7ea3f4c497 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1089,6 +1089,8 @@ pub struct ReaderMetrics { pub(crate) num_rows: usize, /// Metrics for parquet metadata cache. pub(crate) metadata_cache_metrics: MetadataCacheMetrics, + /// Optional metrics for page/row group fetch operations. + pub(crate) fetch_metrics: Option>, } impl ReaderMetrics { @@ -1102,6 +1104,13 @@ impl ReaderMetrics { self.num_rows += other.num_rows; self.metadata_cache_metrics .merge_from(&other.metadata_cache_metrics); + if let Some(other_fetch) = &other.fetch_metrics { + if let Some(self_fetch) = &self.fetch_metrics { + self_fetch.merge_from(other_fetch); + } else { + self.fetch_metrics = Some(other_fetch.clone()); + } + } } /// Reports total rows. @@ -1156,7 +1165,7 @@ impl RowGroupReaderBuilder { &self, row_group_idx: usize, row_selection: Option, - fetch_metrics: &ParquetFetchMetrics, + fetch_metrics: Option<&ParquetFetchMetrics>, ) -> Result { let mut row_group = InMemoryRowGroup::create( self.file_handle.region_id(), @@ -1339,7 +1348,7 @@ impl BatchReader for ParquetReader { let parquet_reader = self .context .reader_builder() - .build(row_group_idx, Some(row_selection), &self.fetch_metrics) + .build(row_group_idx, Some(row_selection), Some(&self.fetch_metrics)) .await?; // Resets the parquet reader. @@ -1400,7 +1409,7 @@ impl ParquetReader { let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() { let parquet_reader = context .reader_builder() - .build(row_group_idx, Some(row_selection), &fetch_metrics) + .build(row_group_idx, Some(row_selection), Some(&fetch_metrics)) .await?; // Compute skip_fields once for this row group let skip_fields = context.should_skip_fields(row_group_idx); diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 7df0c66c14..83f2ab96d3 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -125,6 +125,26 @@ impl ParquetFetchMetrics { self.object_store_fetch_elapsed .load(std::sync::atomic::Ordering::Relaxed) } + + /// Merges metrics from another [ParquetFetchMetrics]. + pub fn merge_from(&self, other: &ParquetFetchMetrics) { + self.page_cache_hit + .fetch_add(other.page_cache_hit(), std::sync::atomic::Ordering::Relaxed); + self.page_cache_miss + .fetch_add(other.page_cache_miss(), std::sync::atomic::Ordering::Relaxed); + self.write_cache_hit + .fetch_add(other.write_cache_hit(), std::sync::atomic::Ordering::Relaxed); + self.write_cache_miss + .fetch_add(other.write_cache_miss(), std::sync::atomic::Ordering::Relaxed); + self.write_cache_fetch_elapsed.fetch_add( + other.write_cache_fetch_elapsed(), + std::sync::atomic::Ordering::Relaxed, + ); + self.object_store_fetch_elapsed.fetch_add( + other.object_store_fetch_elapsed(), + std::sync::atomic::Ordering::Relaxed, + ); + } } pub(crate) struct RowGroupBase<'a> { @@ -336,7 +356,7 @@ impl<'a> InMemoryRowGroup<'a> { &mut self, projection: &ProjectionMask, selection: Option<&RowSelection>, - metrics: &ParquetFetchMetrics, + metrics: Option<&ParquetFetchMetrics>, ) -> Result<()> { if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) { let (fetch_ranges, page_start_offsets) = @@ -375,28 +395,36 @@ impl<'a> InMemoryRowGroup<'a> { async fn fetch_bytes( &self, ranges: &[Range], - metrics: &ParquetFetchMetrics, + metrics: Option<&ParquetFetchMetrics>, ) -> Result> { // Now fetch page timer includes the whole time to read pages. let _timer = READ_STAGE_FETCH_PAGES.start_timer(); let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec()); if let Some(pages) = self.cache_strategy.get_pages(&page_key) { - metrics.inc_page_cache_hit(); + if let Some(metrics) = metrics { + metrics.inc_page_cache_hit(); + } return Ok(pages.compressed.clone()); } - metrics.inc_page_cache_miss(); + if let Some(metrics) = metrics { + metrics.inc_page_cache_miss(); + } let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet); let start = std::time::Instant::now(); let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await; let pages = match write_cache_result { Some(data) => { - metrics.add_write_cache_fetch_elapsed(start.elapsed().as_micros() as u64); - metrics.inc_write_cache_hit(); + if let Some(metrics) = metrics { + metrics.add_write_cache_fetch_elapsed(start.elapsed().as_micros() as u64); + metrics.inc_write_cache_hit(); + } data } None => { - metrics.inc_write_cache_miss(); + if let Some(metrics) = metrics { + metrics.inc_write_cache_miss(); + } // Fetch data from object store. let _timer = READ_STAGE_ELAPSED .with_label_values(&["cache_miss_read"]) @@ -406,7 +434,9 @@ impl<'a> InMemoryRowGroup<'a> { let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges) .await .map_err(|e| ParquetError::External(Box::new(e)))?; - metrics.add_object_store_fetch_elapsed(start.elapsed().as_micros() as u64); + if let Some(metrics) = metrics { + metrics.add_object_store_fetch_elapsed(start.elapsed().as_micros() as u64); + } data } };