From 69cf13b33a6958e0dc895dafedcb1cd8ae64127e Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 24 Nov 2025 20:18:05 +0800 Subject: [PATCH] feat: add parquet metadata metrics Signed-off-by: evenyag --- src/mito2/src/cache.rs | 68 +++++++++++++++++++++++++++++ src/mito2/src/read/scan_util.rs | 1 + src/mito2/src/sst/parquet/reader.rs | 47 ++++++++++++++++++-- 3 files changed, 113 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index b3a9bfb2df..eacffc19f5 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -44,6 +44,7 @@ use crate::cache::write_cache::WriteCacheRef; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; use crate::sst::file::RegionFileId; +use crate::sst::parquet::reader::MetadataCacheMetrics; /// Metrics type key for sst meta. const SST_META_TYPE: &str = "sst_meta"; @@ -90,6 +91,32 @@ impl CacheStrategy { } } + /// Gets parquet metadata with cache metrics tracking. + /// Returns the metadata and updates the provided metrics. + pub(crate) async fn get_parquet_meta_data_with_metrics( + &self, + file_id: RegionFileId, + metrics: &mut MetadataCacheMetrics, + ) -> Option> { + match self { + CacheStrategy::EnableAll(cache_manager) => { + cache_manager + .get_parquet_meta_data_with_metrics(file_id, metrics) + .await + } + CacheStrategy::Compaction(cache_manager) => { + cache_manager + .get_parquet_meta_data_with_metrics(file_id, metrics) + .await + } + CacheStrategy::Disabled => { + metrics.mem_cache_miss += 1; + metrics.file_cache_miss += 1; + None + } + } + } + /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()]. pub fn get_parquet_meta_data_from_mem_cache( &self, @@ -317,6 +344,36 @@ impl CacheManager { None } + /// Gets cached [ParquetMetaData] with metrics tracking. + /// Tries in-memory cache first, then file cache, updating metrics accordingly. + pub(crate) async fn get_parquet_meta_data_with_metrics( + &self, + file_id: RegionFileId, + metrics: &mut MetadataCacheMetrics, + ) -> Option> { + // Try to get metadata from sst meta cache + if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache_inner(file_id) { + metrics.mem_cache_hit += 1; + return Some(metadata); + } + metrics.mem_cache_miss += 1; + + // Try to get metadata from write cache + let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet); + if let Some(write_cache) = &self.write_cache + && let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await + { + metrics.file_cache_hit += 1; + let metadata = Arc::new(metadata); + // Put metadata into sst meta cache + self.put_parquet_meta_data(file_id, metadata.clone()); + return Some(metadata); + }; + metrics.file_cache_miss += 1; + + None + } + /// Gets cached [ParquetMetaData] from in-memory cache. /// This method does not perform I/O. pub fn get_parquet_meta_data_from_mem_cache( @@ -330,6 +387,17 @@ impl CacheManager { }) } + /// Gets cached [ParquetMetaData] from in-memory cache without updating global metrics. + /// This is used by `get_parquet_meta_data_with_metrics` to avoid double counting. + fn get_parquet_meta_data_from_mem_cache_inner( + &self, + file_id: RegionFileId, + ) -> Option> { + self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| { + sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id())) + }) + } + /// Puts [ParquetMetaData] into the cache. pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc) { if let Some(cache) = &self.sst_meta_cache { diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 56a5914352..b62d8b5229 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -310,6 +310,7 @@ impl ScanMetricsSet { num_batches, num_rows, scan_cost: _, + metadata_cache_metrics: _, } = other; self.build_parts_cost += *build_cost; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index eca85c3472..aacfd53cdf 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -253,7 +253,9 @@ impl ParquetReaderBuilder { let file_size = self.file_handle.meta_ref().file_size; // Loads parquet metadata of the file. - let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; + let parquet_meta = self + .read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics) + .await?; // Decodes region metadata. let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); // Gets the metadata stored in the SST. @@ -378,25 +380,34 @@ impl ParquetReaderBuilder { &self, file_path: &str, file_size: u64, + cache_metrics: &mut MetadataCacheMetrics, ) -> Result> { + let start = Instant::now(); let _t = READ_STAGE_ELAPSED .with_label_values(&["read_parquet_metadata"]) .start_timer(); let file_id = self.file_handle.file_id(); - // Tries to get from global cache. - if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await { + // Tries to get from cache with metrics tracking. + if let Some(metadata) = self + .cache_strategy + .get_parquet_meta_data_with_metrics(file_id, cache_metrics) + .await + { + cache_metrics.metadata_load_cost += start.elapsed(); return Ok(metadata); } // Cache miss, load metadata directly. let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size); let metadata = metadata_loader.load().await?; + let metadata = Arc::new(metadata); // Cache the metadata. self.cache_strategy .put_parquet_meta_data(file_id, metadata.clone()); + cache_metrics.metadata_load_cost += start.elapsed(); Ok(metadata) } @@ -991,6 +1002,32 @@ impl ReaderFilterMetrics { } } +/// Metrics for parquet metadata cache operations. +#[derive(Debug, Default, Clone, Copy)] +pub(crate) struct MetadataCacheMetrics { + /// Number of memory cache hits for parquet metadata. + pub(crate) mem_cache_hit: usize, + /// Number of memory cache misses for parquet metadata. + pub(crate) mem_cache_miss: usize, + /// Number of file cache hits for parquet metadata. + pub(crate) file_cache_hit: usize, + /// Number of file cache misses for parquet metadata. + pub(crate) file_cache_miss: usize, + /// Duration to load parquet metadata. + pub(crate) metadata_load_cost: Duration, +} + +impl MetadataCacheMetrics { + /// Adds `other` metrics to this metrics. + pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) { + self.mem_cache_hit += other.mem_cache_hit; + self.mem_cache_miss += other.mem_cache_miss; + self.file_cache_hit += other.file_cache_hit; + self.file_cache_miss += other.file_cache_miss; + self.metadata_load_cost += other.metadata_load_cost; + } +} + /// Parquet reader metrics. #[derive(Debug, Default, Clone)] pub struct ReaderMetrics { @@ -1006,6 +1043,8 @@ pub struct ReaderMetrics { pub(crate) num_batches: usize, /// Number of rows read. pub(crate) num_rows: usize, + /// Metrics for parquet metadata cache. + pub(crate) metadata_cache_metrics: MetadataCacheMetrics, } impl ReaderMetrics { @@ -1017,6 +1056,8 @@ impl ReaderMetrics { self.num_record_batches += other.num_record_batches; self.num_batches += other.num_batches; self.num_rows += other.num_rows; + self.metadata_cache_metrics + .merge_from(&other.metadata_cache_metrics); } /// Reports total rows.