From db68aea7c28f2099a897f13f8ef32c55c833ec18 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 19 May 2026 15:13:04 +0800 Subject: [PATCH] feat: cache sst meta with policy aware Signed-off-by: Ruihang Xia --- src/mito2/src/cache.rs | 106 ++++++++++++++++++++++++++-- src/mito2/src/cache/file_cache.rs | 13 ++-- src/mito2/src/read/scan_region.rs | 7 ++ src/mito2/src/sst/parquet/reader.rs | 7 +- 4 files changed, 123 insertions(+), 10 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 48881e5f05..c05db5b989 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -148,17 +148,35 @@ pub(crate) struct CachedSstMeta { parquet_metadata: Arc, region_metadata: RegionMetadataRef, region_metadata_weight: usize, + page_index_policy: PageIndexPolicy, } impl CachedSstMeta { + #[cfg(test)] pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result { - Self::try_new_with_region_metadata(file_path, parquet_metadata, None) + let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata); + Self::try_new_with_page_index_policy(file_path, parquet_metadata, None, page_index_policy) } pub(crate) fn try_new_with_region_metadata( file_path: &str, parquet_metadata: ParquetMetaData, region_metadata: Option, + ) -> Result { + let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata); + Self::try_new_with_page_index_policy( + file_path, + parquet_metadata, + region_metadata, + page_index_policy, + ) + } + + pub(crate) fn try_new_with_page_index_policy( + file_path: &str, + parquet_metadata: ParquetMetaData, + region_metadata: Option, + page_index_policy: PageIndexPolicy, ) -> Result { let file_metadata = parquet_metadata.file_metadata(); let key_values = file_metadata @@ -196,6 +214,7 @@ impl CachedSstMeta { parquet_metadata, region_metadata, region_metadata_weight, + page_index_policy, }) } @@ -206,6 +225,22 @@ impl CachedSstMeta { pub(crate) fn region_metadata(&self) -> RegionMetadataRef { self.region_metadata.clone() } + + fn satisfies_page_index_policy(&self, requested: PageIndexPolicy) -> bool { + match requested { + PageIndexPolicy::Skip => true, + PageIndexPolicy::Optional => self.page_index_policy != PageIndexPolicy::Skip, + PageIndexPolicy::Required => self.page_index_policy == PageIndexPolicy::Required, + } + } +} + +fn infer_loaded_page_index_policy(parquet_metadata: &ParquetMetaData) -> PageIndexPolicy { + if parquet_metadata.column_index().is_some() || parquet_metadata.offset_index().is_some() { + PageIndexPolicy::Optional + } else { + PageIndexPolicy::Skip + } } fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData { @@ -279,10 +314,11 @@ impl CacheStrategy { pub(crate) fn get_sst_meta_data_from_mem_cache( &self, file_id: RegionFileId, + page_index_policy: PageIndexPolicy, ) -> Option> { match self { CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => { - cache_manager.get_sst_meta_data_from_mem_cache(file_id) + cache_manager.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) } CacheStrategy::Disabled => None, } @@ -293,7 +329,7 @@ impl CacheStrategy { &self, file_id: RegionFileId, ) -> Option> { - self.get_sst_meta_data_from_mem_cache(file_id) + self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip) .map(|metadata| metadata.parquet_metadata()) } @@ -592,7 +628,7 @@ impl CacheManager { metrics: &mut MetadataCacheMetrics, page_index_policy: PageIndexPolicy, ) -> Option> { - if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) { + if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) { metrics.mem_cache_hit += 1; return Some(metadata); } @@ -631,9 +667,12 @@ impl CacheManager { pub(crate) fn get_sst_meta_data_from_mem_cache( &self, file_id: RegionFileId, + page_index_policy: PageIndexPolicy, ) -> Option> { self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| { let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id())); + let value = + value.filter(|metadata| metadata.satisfies_page_index_policy(page_index_policy)); update_hit_miss(value, SST_META_TYPE) }) } @@ -644,7 +683,7 @@ impl CacheManager { &self, file_id: RegionFileId, ) -> Option> { - self.get_sst_meta_data_from_mem_cache(file_id) + self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip) .map(|metadata| metadata.parquet_metadata()) } @@ -1388,6 +1427,63 @@ mod tests { assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata())); } + #[tokio::test] + async fn test_parquet_meta_cache_respects_page_index_policy() { + let cache = CacheManager::builder().sst_meta_cache_size(2000).build(); + let region_id = RegionId::new(1, 1); + let file_id = RegionFileId::new(region_id, FileId::random()); + let (metadata, _) = sst_parquet_meta(); + + let skip_metadata = Arc::new( + CachedSstMeta::try_new_with_page_index_policy( + "test.parquet", + Arc::unwrap_or_clone(metadata.clone()), + None, + PageIndexPolicy::Skip, + ) + .unwrap(), + ); + cache.put_sst_meta_data(file_id, skip_metadata); + + let mut metrics = MetadataCacheMetrics::default(); + assert!( + cache + .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional) + .await + .is_none() + ); + assert_eq!(1, metrics.cache_miss); + + let optional_metadata = Arc::new( + CachedSstMeta::try_new_with_page_index_policy( + "test.parquet", + Arc::unwrap_or_clone(metadata), + None, + PageIndexPolicy::Optional, + ) + .unwrap(), + ); + cache.put_sst_meta_data(file_id, optional_metadata); + + let mut metrics = MetadataCacheMetrics::default(); + assert!( + cache + .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional) + .await + .is_some() + ); + assert_eq!(1, metrics.mem_cache_hit); + + let mut metrics = MetadataCacheMetrics::default(); + assert!( + cache + .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Skip) + .await + .is_some() + ); + assert_eq!(1, metrics.mem_cache_hit); + } + #[test] fn test_meta_cache_weight_accounts_for_decoded_region_metadata() { let region_metadata = Arc::new(wide_region_metadata(128)); diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 9b987c810b..681b2f90e3 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -628,8 +628,13 @@ impl FileCache { let file_path = self.inner.cache_file_path(key); self.get_parquet_meta_data(key, cache_metrics, page_index_policy) .await - .and_then( - |metadata| match CachedSstMeta::try_new(&file_path, metadata) { + .and_then(|metadata| { + match CachedSstMeta::try_new_with_page_index_policy( + &file_path, + metadata, + None, + page_index_policy, + ) { Ok(metadata) => Some(Arc::new(metadata)), Err(err) => { CACHE_MISS @@ -641,8 +646,8 @@ impl FileCache { ); None } - }, - ) + } + }) } async fn get_reader(&self, file_path: &str) -> object_store::Result> { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index fb30913534..84d725a257 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -37,6 +37,7 @@ use datatypes::schema::Schema; use datatypes::schema::ext::ArrowSchemaExt; use datatypes::types::json_type::JsonNativeType; use futures::StreamExt; +use parquet::file::metadata::PageIndexPolicy; use partition::expr::PartitionExpr; use smallvec::SmallVec; use snafu::ResultExt; @@ -1080,6 +1081,7 @@ impl ScanInput { reader_metrics: &mut ReaderMetrics, ) -> Result { let predicate = self.predicate_for_file(file); + let may_build_selective_row_selection = predicate.is_some(); let decode_pk_values = !self.compaction && self .mapper @@ -1095,6 +1097,11 @@ impl ScanInput { .inverted_index_appliers(self.inverted_index_appliers.clone()) .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone()) .fulltext_index_appliers(self.fulltext_index_appliers.clone()); + let reader = if !self.compaction && may_build_selective_row_selection { + reader.page_index_policy(PageIndexPolicy::Optional) + } else { + reader + }; #[cfg(feature = "vector_index")] let reader = { let mut reader = reader; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 26d5cb7b61..2128c52186 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -614,7 +614,12 @@ impl ParquetReaderBuilder { metadata_loader.with_page_index_policy(page_index_policy); let metadata = metadata_loader.load(cache_metrics).await?; - let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?); + let metadata = Arc::new(CachedSstMeta::try_new_with_page_index_policy( + file_path, + metadata, + None, + page_index_policy, + )?); // Cache the metadata. self.cache_strategy .put_sst_meta_data(file_id, metadata.clone());