feat: add parquet metadata metrics

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-24 20:18:05 +08:00
committed by shuiyisong
parent c83a282b39
commit 69cf13b33a
3 changed files with 113 additions and 3 deletions

View File

@@ -44,6 +44,7 @@ use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::RegionFileId;
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Metrics type key for sst meta.
const SST_META_TYPE: &str = "sst_meta";
@@ -90,6 +91,32 @@ impl CacheStrategy {
}
}
/// Gets parquet metadata with cache metrics tracking.
/// Returns the metadata and updates the provided metrics.
pub(crate) async fn get_parquet_meta_data_with_metrics(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager
.get_parquet_meta_data_with_metrics(file_id, metrics)
.await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager
.get_parquet_meta_data_with_metrics(file_id, metrics)
.await
}
CacheStrategy::Disabled => {
metrics.mem_cache_miss += 1;
metrics.file_cache_miss += 1;
None
}
}
}
/// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
pub fn get_parquet_meta_data_from_mem_cache(
&self,
@@ -317,6 +344,36 @@ impl CacheManager {
None
}
/// Gets cached [ParquetMetaData] with metrics tracking.
/// Tries in-memory cache first, then file cache, updating metrics accordingly.
pub(crate) async fn get_parquet_meta_data_with_metrics(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache_inner(file_id) {
metrics.mem_cache_hit += 1;
return Some(metadata);
}
metrics.mem_cache_miss += 1;
// Try to get metadata from write cache
let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
if let Some(write_cache) = &self.write_cache
&& let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
{
metrics.file_cache_hit += 1;
let metadata = Arc::new(metadata);
// Put metadata into sst meta cache
self.put_parquet_meta_data(file_id, metadata.clone());
return Some(metadata);
};
metrics.file_cache_miss += 1;
None
}
/// Gets cached [ParquetMetaData] from in-memory cache.
/// This method does not perform I/O.
pub fn get_parquet_meta_data_from_mem_cache(
@@ -330,6 +387,17 @@ impl CacheManager {
})
}
/// Gets cached [ParquetMetaData] from in-memory cache without updating global metrics.
/// This is used by `get_parquet_meta_data_with_metrics` to avoid double counting.
fn get_parquet_meta_data_from_mem_cache_inner(
&self,
file_id: RegionFileId,
) -> Option<Arc<ParquetMetaData>> {
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()))
})
}
/// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
if let Some(cache) = &self.sst_meta_cache {

View File

@@ -310,6 +310,7 @@ impl ScanMetricsSet {
num_batches,
num_rows,
scan_cost: _,
metadata_cache_metrics: _,
} = other;
self.build_parts_cost += *build_cost;

View File

@@ -253,7 +253,9 @@ impl ParquetReaderBuilder {
let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file.
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
let parquet_meta = self
.read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
.await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
// Gets the metadata stored in the SST.
@@ -378,25 +380,34 @@ impl ParquetReaderBuilder {
&self,
file_path: &str,
file_size: u64,
cache_metrics: &mut MetadataCacheMetrics,
) -> Result<Arc<ParquetMetaData>> {
let start = Instant::now();
let _t = READ_STAGE_ELAPSED
.with_label_values(&["read_parquet_metadata"])
.start_timer();
let file_id = self.file_handle.file_id();
// Tries to get from global cache.
if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
// Tries to get from cache with metrics tracking.
if let Some(metadata) = self
.cache_strategy
.get_parquet_meta_data_with_metrics(file_id, cache_metrics)
.await
{
cache_metrics.metadata_load_cost += start.elapsed();
return Ok(metadata);
}
// Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let metadata = metadata_loader.load().await?;
let metadata = Arc::new(metadata);
// Cache the metadata.
self.cache_strategy
.put_parquet_meta_data(file_id, metadata.clone());
cache_metrics.metadata_load_cost += start.elapsed();
Ok(metadata)
}
@@ -991,6 +1002,32 @@ impl ReaderFilterMetrics {
}
}
/// Metrics for parquet metadata cache operations.
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct MetadataCacheMetrics {
/// Number of memory cache hits for parquet metadata.
pub(crate) mem_cache_hit: usize,
/// Number of memory cache misses for parquet metadata.
pub(crate) mem_cache_miss: usize,
/// Number of file cache hits for parquet metadata.
pub(crate) file_cache_hit: usize,
/// Number of file cache misses for parquet metadata.
pub(crate) file_cache_miss: usize,
/// Duration to load parquet metadata.
pub(crate) metadata_load_cost: Duration,
}
impl MetadataCacheMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
self.mem_cache_hit += other.mem_cache_hit;
self.mem_cache_miss += other.mem_cache_miss;
self.file_cache_hit += other.file_cache_hit;
self.file_cache_miss += other.file_cache_miss;
self.metadata_load_cost += other.metadata_load_cost;
}
}
/// Parquet reader metrics.
#[derive(Debug, Default, Clone)]
pub struct ReaderMetrics {
@@ -1006,6 +1043,8 @@ pub struct ReaderMetrics {
pub(crate) num_batches: usize,
/// Number of rows read.
pub(crate) num_rows: usize,
/// Metrics for parquet metadata cache.
pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
}
impl ReaderMetrics {
@@ -1017,6 +1056,8 @@ impl ReaderMetrics {
self.num_record_batches += other.num_record_batches;
self.num_batches += other.num_batches;
self.num_rows += other.num_rows;
self.metadata_cache_metrics
.merge_from(&other.metadata_cache_metrics);
}
/// Reports total rows.