From 7e6af2c7eee3310c5a9849d969429d0cea3eb9d5 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 26 Nov 2025 15:52:20 +0800 Subject: [PATCH] feat: collect the whole fetch time Signed-off-by: evenyag --- src/mito2/src/sst/parquet/reader.rs | 7 +++++ src/mito2/src/sst/parquet/row_group.rs | 43 +++++++++++++++++++++----- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index f9805df86d..33a48372d6 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1212,6 +1212,8 @@ impl RowGroupReaderBuilder { row_selection: Option, fetch_metrics: Option<&ParquetFetchMetrics>, ) -> Result { + let fetch_start = Instant::now(); + let mut row_group = InMemoryRowGroup::create( self.file_handle.region_id(), self.file_handle.file_id().file_id(), @@ -1229,6 +1231,11 @@ impl RowGroupReaderBuilder { path: &self.file_path, })?; + // Record total fetch elapsed time. + if let Some(metrics) = fetch_metrics { + metrics.add_total_fetch_elapsed(fetch_start.elapsed().as_micros() as u64); + } + // Builds the parquet reader. // Now the row selection is None. ParquetRecordBatchReader::try_new_with_row_groups( diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 8de0256c88..03288f1cb7 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -51,6 +51,8 @@ pub struct ParquetFetchMetrics { write_cache_fetch_elapsed: std::sync::atomic::AtomicU64, /// Elapsed time in microseconds fetching from object store. store_fetch_elapsed: std::sync::atomic::AtomicU64, + /// Total elapsed time in microseconds for fetching row groups. + total_fetch_elapsed: std::sync::atomic::AtomicU64, } impl std::fmt::Debug for ParquetFetchMetrics { @@ -63,7 +65,8 @@ impl std::fmt::Debug for ParquetFetchMetrics { let pages_to_fetch = self.pages_to_fetch(); let page_size_to_fetch = self.page_size_to_fetch(); let write_cache_elapsed = self.write_cache_fetch_elapsed(); - let object_store_elapsed = self.object_store_fetch_elapsed(); + let store_elapsed = self.store_fetch_elapsed(); + let total_elapsed = self.total_fetch_elapsed(); if page_cache_hit > 0 { write!(f, "\"page_cache_hit\":{}", page_cache_hit)?; @@ -98,12 +101,20 @@ impl std::fmt::Debug for ParquetFetchMetrics { write!(f, "\"write_cache_fetch_elapsed\":\"{:?}\"", duration)?; first = false; } - if object_store_elapsed > 0 { + if store_elapsed > 0 { if !first { write!(f, ", ")?; } - let duration = std::time::Duration::from_micros(object_store_elapsed); - write!(f, "\"object_store_fetch_elapsed\":\"{:?}\"", duration)?; + let duration = std::time::Duration::from_micros(store_elapsed); + write!(f, "\"store_fetch_elapsed\":\"{:?}\"", duration)?; + first = false; + } + if total_elapsed > 0 { + if !first { + write!(f, ", ")?; + } + let duration = std::time::Duration::from_micros(total_elapsed); + write!(f, "\"total_fetch_elapsed\":\"{:?}\"", duration)?; } write!(f, "}}") @@ -172,17 +183,29 @@ impl ParquetFetchMetrics { } /// Adds elapsed time in microseconds for object store fetch. - pub fn add_object_store_fetch_elapsed(&self, elapsed_us: u64) { + pub fn add_store_fetch_elapsed(&self, elapsed_us: u64) { self.store_fetch_elapsed .fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed); } /// Returns the elapsed time in microseconds for object store fetch. - pub fn object_store_fetch_elapsed(&self) -> u64 { + pub fn store_fetch_elapsed(&self) -> u64 { self.store_fetch_elapsed .load(std::sync::atomic::Ordering::Relaxed) } + /// Adds elapsed time in microseconds for total fetch operation. + pub fn add_total_fetch_elapsed(&self, elapsed_us: u64) { + self.total_fetch_elapsed + .fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed); + } + + /// Returns the total elapsed time in microseconds for fetch operations. + pub fn total_fetch_elapsed(&self) -> u64 { + self.total_fetch_elapsed + .load(std::sync::atomic::Ordering::Relaxed) + } + /// Merges metrics from another [ParquetFetchMetrics]. pub fn merge_from(&self, other: &ParquetFetchMetrics) { self.page_cache_hit @@ -202,7 +225,11 @@ impl ParquetFetchMetrics { std::sync::atomic::Ordering::Relaxed, ); self.store_fetch_elapsed.fetch_add( - other.object_store_fetch_elapsed(), + other.store_fetch_elapsed(), + std::sync::atomic::Ordering::Relaxed, + ); + self.total_fetch_elapsed.fetch_add( + other.total_fetch_elapsed(), std::sync::atomic::Ordering::Relaxed, ); } @@ -502,7 +529,7 @@ impl<'a> InMemoryRowGroup<'a> { .await .map_err(|e| ParquetError::External(Box::new(e)))?; if let Some(metrics) = metrics { - metrics.add_object_store_fetch_elapsed(start.elapsed().as_micros() as u64); + metrics.add_store_fetch_elapsed(start.elapsed().as_micros() as u64); } data }