feat: add fetch metrics to ReaderMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-24 22:06:06 +08:00
committed by shuiyisong
parent c14728e3ae
commit 09effc8128
4 changed files with 65 additions and 19 deletions

View File

@@ -315,6 +315,7 @@ impl ScanMetricsSet {
num_rows,
scan_cost: _,
metadata_cache_metrics: _,
fetch_metrics: _,
} = other;
self.build_parts_cost += *build_cost;
@@ -828,11 +829,14 @@ pub fn build_file_range_scan_stream(
ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let reader_metrics = &mut ReaderMetrics::default();
let fetch_metrics = ParquetFetchMetrics::default();
let fetch_metrics = Arc::new(ParquetFetchMetrics::default());
let reader_metrics = &mut ReaderMetrics {
fetch_metrics: Some(fetch_metrics.clone()),
..Default::default()
};
for range in ranges {
let build_reader_start = Instant::now();
let reader = range.reader(stream_ctx.input.series_row_selector, &fetch_metrics).await?;
let reader = range.reader(stream_ctx.input.series_row_selector, Some(&fetch_metrics)).await?;
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch();
@@ -864,11 +868,14 @@ pub fn build_flat_file_range_scan_stream(
ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<RecordBatch>> {
try_stream! {
let reader_metrics = &mut ReaderMetrics::default();
let fetch_metrics = ParquetFetchMetrics::default();
let fetch_metrics = Arc::new(ParquetFetchMetrics::default());
let reader_metrics = &mut ReaderMetrics {
fetch_metrics: Some(fetch_metrics.clone()),
..Default::default()
};
for range in ranges {
let build_reader_start = Instant::now();
let mut reader = range.flat_reader(&fetch_metrics).await?;
let mut reader = range.flat_reader(Some(&fetch_metrics)).await?;
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);

View File

@@ -118,7 +118,7 @@ impl FileRange {
pub(crate) async fn reader(
&self,
selector: Option<TimeSeriesRowSelector>,
fetch_metrics: &ParquetFetchMetrics,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<PruneReader> {
let parquet_reader = self
.context
@@ -176,7 +176,7 @@ impl FileRange {
/// Creates a flat reader that returns RecordBatch.
pub(crate) async fn flat_reader(
&self,
fetch_metrics: &ParquetFetchMetrics,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<FlatPruneReader> {
let parquet_reader = self
.context

View File

@@ -1089,6 +1089,8 @@ pub struct ReaderMetrics {
pub(crate) num_rows: usize,
/// Metrics for parquet metadata cache.
pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
/// Optional metrics for page/row group fetch operations.
pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
}
impl ReaderMetrics {
@@ -1102,6 +1104,13 @@ impl ReaderMetrics {
self.num_rows += other.num_rows;
self.metadata_cache_metrics
.merge_from(&other.metadata_cache_metrics);
if let Some(other_fetch) = &other.fetch_metrics {
if let Some(self_fetch) = &self.fetch_metrics {
self_fetch.merge_from(other_fetch);
} else {
self.fetch_metrics = Some(other_fetch.clone());
}
}
}
/// Reports total rows.
@@ -1156,7 +1165,7 @@ impl RowGroupReaderBuilder {
&self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
fetch_metrics: &ParquetFetchMetrics,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchReader> {
let mut row_group = InMemoryRowGroup::create(
self.file_handle.region_id(),
@@ -1339,7 +1348,7 @@ impl BatchReader for ParquetReader {
let parquet_reader = self
.context
.reader_builder()
.build(row_group_idx, Some(row_selection), &self.fetch_metrics)
.build(row_group_idx, Some(row_selection), Some(&self.fetch_metrics))
.await?;
// Resets the parquet reader.
@@ -1400,7 +1409,7 @@ impl ParquetReader {
let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
let parquet_reader = context
.reader_builder()
.build(row_group_idx, Some(row_selection), &fetch_metrics)
.build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
.await?;
// Compute skip_fields once for this row group
let skip_fields = context.should_skip_fields(row_group_idx);

View File

@@ -125,6 +125,26 @@ impl ParquetFetchMetrics {
self.object_store_fetch_elapsed
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Merges metrics from another [ParquetFetchMetrics].
pub fn merge_from(&self, other: &ParquetFetchMetrics) {
self.page_cache_hit
.fetch_add(other.page_cache_hit(), std::sync::atomic::Ordering::Relaxed);
self.page_cache_miss
.fetch_add(other.page_cache_miss(), std::sync::atomic::Ordering::Relaxed);
self.write_cache_hit
.fetch_add(other.write_cache_hit(), std::sync::atomic::Ordering::Relaxed);
self.write_cache_miss
.fetch_add(other.write_cache_miss(), std::sync::atomic::Ordering::Relaxed);
self.write_cache_fetch_elapsed.fetch_add(
other.write_cache_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
self.object_store_fetch_elapsed.fetch_add(
other.object_store_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
}
}
pub(crate) struct RowGroupBase<'a> {
@@ -336,7 +356,7 @@ impl<'a> InMemoryRowGroup<'a> {
&mut self,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
metrics: &ParquetFetchMetrics,
metrics: Option<&ParquetFetchMetrics>,
) -> Result<()> {
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
let (fetch_ranges, page_start_offsets) =
@@ -375,28 +395,36 @@ impl<'a> InMemoryRowGroup<'a> {
async fn fetch_bytes(
&self,
ranges: &[Range<u64>],
metrics: &ParquetFetchMetrics,
metrics: Option<&ParquetFetchMetrics>,
) -> Result<Vec<Bytes>> {
// Now fetch page timer includes the whole time to read pages.
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
metrics.inc_page_cache_hit();
if let Some(metrics) = metrics {
metrics.inc_page_cache_hit();
}
return Ok(pages.compressed.clone());
}
metrics.inc_page_cache_miss();
if let Some(metrics) = metrics {
metrics.inc_page_cache_miss();
}
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
let start = std::time::Instant::now();
let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
let pages = match write_cache_result {
Some(data) => {
metrics.add_write_cache_fetch_elapsed(start.elapsed().as_micros() as u64);
metrics.inc_write_cache_hit();
if let Some(metrics) = metrics {
metrics.add_write_cache_fetch_elapsed(start.elapsed().as_micros() as u64);
metrics.inc_write_cache_hit();
}
data
}
None => {
metrics.inc_write_cache_miss();
if let Some(metrics) = metrics {
metrics.inc_write_cache_miss();
}
// Fetch data from object store.
let _timer = READ_STAGE_ELAPSED
.with_label_values(&["cache_miss_read"])
@@ -406,7 +434,9 @@ impl<'a> InMemoryRowGroup<'a> {
let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
metrics.add_object_store_fetch_elapsed(start.elapsed().as_micros() as u64);
if let Some(metrics) = metrics {
metrics.add_object_store_fetch_elapsed(start.elapsed().as_micros() as u64);
}
data
}
};