feat: collect the whole fetch time

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-26 15:52:20 +08:00
committed by shuiyisong
parent 87d3b17f4d
commit 7e6af2c7ee
2 changed files with 42 additions and 8 deletions

View File

@@ -1212,6 +1212,8 @@ impl RowGroupReaderBuilder {
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchReader> {
let fetch_start = Instant::now();
let mut row_group = InMemoryRowGroup::create(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
@@ -1229,6 +1231,11 @@ impl RowGroupReaderBuilder {
path: &self.file_path,
})?;
// Record total fetch elapsed time.
if let Some(metrics) = fetch_metrics {
metrics.add_total_fetch_elapsed(fetch_start.elapsed().as_micros() as u64);
}
// Builds the parquet reader.
// Now the row selection is None.
ParquetRecordBatchReader::try_new_with_row_groups(

View File

@@ -51,6 +51,8 @@ pub struct ParquetFetchMetrics {
write_cache_fetch_elapsed: std::sync::atomic::AtomicU64,
/// Elapsed time in microseconds fetching from object store.
store_fetch_elapsed: std::sync::atomic::AtomicU64,
/// Total elapsed time in microseconds for fetching row groups.
total_fetch_elapsed: std::sync::atomic::AtomicU64,
}
impl std::fmt::Debug for ParquetFetchMetrics {
@@ -63,7 +65,8 @@ impl std::fmt::Debug for ParquetFetchMetrics {
let pages_to_fetch = self.pages_to_fetch();
let page_size_to_fetch = self.page_size_to_fetch();
let write_cache_elapsed = self.write_cache_fetch_elapsed();
let object_store_elapsed = self.object_store_fetch_elapsed();
let store_elapsed = self.store_fetch_elapsed();
let total_elapsed = self.total_fetch_elapsed();
if page_cache_hit > 0 {
write!(f, "\"page_cache_hit\":{}", page_cache_hit)?;
@@ -98,12 +101,20 @@ impl std::fmt::Debug for ParquetFetchMetrics {
write!(f, "\"write_cache_fetch_elapsed\":\"{:?}\"", duration)?;
first = false;
}
if object_store_elapsed > 0 {
if store_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(object_store_elapsed);
write!(f, "\"object_store_fetch_elapsed\":\"{:?}\"", duration)?;
let duration = std::time::Duration::from_micros(store_elapsed);
write!(f, "\"store_fetch_elapsed\":\"{:?}\"", duration)?;
first = false;
}
if total_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(total_elapsed);
write!(f, "\"total_fetch_elapsed\":\"{:?}\"", duration)?;
}
write!(f, "}}")
@@ -172,17 +183,29 @@ impl ParquetFetchMetrics {
}
/// Adds elapsed time in microseconds for object store fetch.
pub fn add_object_store_fetch_elapsed(&self, elapsed_us: u64) {
pub fn add_store_fetch_elapsed(&self, elapsed_us: u64) {
self.store_fetch_elapsed
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the elapsed time in microseconds for object store fetch.
pub fn object_store_fetch_elapsed(&self) -> u64 {
pub fn store_fetch_elapsed(&self) -> u64 {
self.store_fetch_elapsed
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Adds elapsed time in microseconds for total fetch operation.
pub fn add_total_fetch_elapsed(&self, elapsed_us: u64) {
self.total_fetch_elapsed
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the total elapsed time in microseconds for fetch operations.
pub fn total_fetch_elapsed(&self) -> u64 {
self.total_fetch_elapsed
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Merges metrics from another [ParquetFetchMetrics].
pub fn merge_from(&self, other: &ParquetFetchMetrics) {
self.page_cache_hit
@@ -202,7 +225,11 @@ impl ParquetFetchMetrics {
std::sync::atomic::Ordering::Relaxed,
);
self.store_fetch_elapsed.fetch_add(
other.object_store_fetch_elapsed(),
other.store_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
self.total_fetch_elapsed.fetch_add(
other.total_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
}
@@ -502,7 +529,7 @@ impl<'a> InMemoryRowGroup<'a> {
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
if let Some(metrics) = metrics {
metrics.add_object_store_fetch_elapsed(start.elapsed().as_micros() as u64);
metrics.add_store_fetch_elapsed(start.elapsed().as_micros() as u64);
}
data
}