From ab3cadb4b576d24347a4e1548611b2d756b914da Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 21 May 2026 14:06:44 +0800 Subject: [PATCH] feat: use and cache page index from sst meta (#8139) * feat: cache sst meta with policy aware Signed-off-by: Ruihang Xia * load page index when necessary Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/mito2/src/cache.rs | 106 +++++++++++++++++++- src/mito2/src/cache/file_cache.rs | 13 ++- src/mito2/src/read/scan_region.rs | 6 ++ src/mito2/src/sst/parquet/reader.rs | 145 ++++++++++++++++++++++------ 4 files changed, 231 insertions(+), 39 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 48881e5f05..c05db5b989 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -148,17 +148,35 @@ pub(crate) struct CachedSstMeta { parquet_metadata: Arc, region_metadata: RegionMetadataRef, region_metadata_weight: usize, + page_index_policy: PageIndexPolicy, } impl CachedSstMeta { + #[cfg(test)] pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result { - Self::try_new_with_region_metadata(file_path, parquet_metadata, None) + let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata); + Self::try_new_with_page_index_policy(file_path, parquet_metadata, None, page_index_policy) } pub(crate) fn try_new_with_region_metadata( file_path: &str, parquet_metadata: ParquetMetaData, region_metadata: Option, + ) -> Result { + let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata); + Self::try_new_with_page_index_policy( + file_path, + parquet_metadata, + region_metadata, + page_index_policy, + ) + } + + pub(crate) fn try_new_with_page_index_policy( + file_path: &str, + parquet_metadata: ParquetMetaData, + region_metadata: Option, + page_index_policy: PageIndexPolicy, ) -> Result { let file_metadata = parquet_metadata.file_metadata(); let key_values = file_metadata @@ -196,6 +214,7 @@ impl CachedSstMeta { parquet_metadata, region_metadata, region_metadata_weight, + page_index_policy, }) } @@ -206,6 +225,22 @@ impl CachedSstMeta { pub(crate) fn region_metadata(&self) -> RegionMetadataRef { self.region_metadata.clone() } + + fn satisfies_page_index_policy(&self, requested: PageIndexPolicy) -> bool { + match requested { + PageIndexPolicy::Skip => true, + PageIndexPolicy::Optional => self.page_index_policy != PageIndexPolicy::Skip, + PageIndexPolicy::Required => self.page_index_policy == PageIndexPolicy::Required, + } + } +} + +fn infer_loaded_page_index_policy(parquet_metadata: &ParquetMetaData) -> PageIndexPolicy { + if parquet_metadata.column_index().is_some() || parquet_metadata.offset_index().is_some() { + PageIndexPolicy::Optional + } else { + PageIndexPolicy::Skip + } } fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData { @@ -279,10 +314,11 @@ impl CacheStrategy { pub(crate) fn get_sst_meta_data_from_mem_cache( &self, file_id: RegionFileId, + page_index_policy: PageIndexPolicy, ) -> Option> { match self { CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => { - cache_manager.get_sst_meta_data_from_mem_cache(file_id) + cache_manager.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) } CacheStrategy::Disabled => None, } @@ -293,7 +329,7 @@ impl CacheStrategy { &self, file_id: RegionFileId, ) -> Option> { - self.get_sst_meta_data_from_mem_cache(file_id) + self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip) .map(|metadata| metadata.parquet_metadata()) } @@ -592,7 +628,7 @@ impl CacheManager { metrics: &mut MetadataCacheMetrics, page_index_policy: PageIndexPolicy, ) -> Option> { - if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) { + if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) { metrics.mem_cache_hit += 1; return Some(metadata); } @@ -631,9 +667,12 @@ impl CacheManager { pub(crate) fn get_sst_meta_data_from_mem_cache( &self, file_id: RegionFileId, + page_index_policy: PageIndexPolicy, ) -> Option> { self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| { let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id())); + let value = + value.filter(|metadata| metadata.satisfies_page_index_policy(page_index_policy)); update_hit_miss(value, SST_META_TYPE) }) } @@ -644,7 +683,7 @@ impl CacheManager { &self, file_id: RegionFileId, ) -> Option> { - self.get_sst_meta_data_from_mem_cache(file_id) + self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip) .map(|metadata| metadata.parquet_metadata()) } @@ -1388,6 +1427,63 @@ mod tests { assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata())); } + #[tokio::test] + async fn test_parquet_meta_cache_respects_page_index_policy() { + let cache = CacheManager::builder().sst_meta_cache_size(2000).build(); + let region_id = RegionId::new(1, 1); + let file_id = RegionFileId::new(region_id, FileId::random()); + let (metadata, _) = sst_parquet_meta(); + + let skip_metadata = Arc::new( + CachedSstMeta::try_new_with_page_index_policy( + "test.parquet", + Arc::unwrap_or_clone(metadata.clone()), + None, + PageIndexPolicy::Skip, + ) + .unwrap(), + ); + cache.put_sst_meta_data(file_id, skip_metadata); + + let mut metrics = MetadataCacheMetrics::default(); + assert!( + cache + .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional) + .await + .is_none() + ); + assert_eq!(1, metrics.cache_miss); + + let optional_metadata = Arc::new( + CachedSstMeta::try_new_with_page_index_policy( + "test.parquet", + Arc::unwrap_or_clone(metadata), + None, + PageIndexPolicy::Optional, + ) + .unwrap(), + ); + cache.put_sst_meta_data(file_id, optional_metadata); + + let mut metrics = MetadataCacheMetrics::default(); + assert!( + cache + .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional) + .await + .is_some() + ); + assert_eq!(1, metrics.mem_cache_hit); + + let mut metrics = MetadataCacheMetrics::default(); + assert!( + cache + .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Skip) + .await + .is_some() + ); + assert_eq!(1, metrics.mem_cache_hit); + } + #[test] fn test_meta_cache_weight_accounts_for_decoded_region_metadata() { let region_metadata = Arc::new(wide_region_metadata(128)); diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 9b987c810b..681b2f90e3 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -628,8 +628,13 @@ impl FileCache { let file_path = self.inner.cache_file_path(key); self.get_parquet_meta_data(key, cache_metrics, page_index_policy) .await - .and_then( - |metadata| match CachedSstMeta::try_new(&file_path, metadata) { + .and_then(|metadata| { + match CachedSstMeta::try_new_with_page_index_policy( + &file_path, + metadata, + None, + page_index_policy, + ) { Ok(metadata) => Some(Arc::new(metadata)), Err(err) => { CACHE_MISS @@ -641,8 +646,8 @@ impl FileCache { ); None } - }, - ) + } + }) } async fn get_reader(&self, file_path: &str) -> object_store::Result> { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index fb30913534..2f375ed0f2 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -1080,6 +1080,7 @@ impl ScanInput { reader_metrics: &mut ReaderMetrics, ) -> Result { let predicate = self.predicate_for_file(file); + let may_build_selective_row_selection = predicate.is_some(); let decode_pk_values = !self.compaction && self .mapper @@ -1095,6 +1096,11 @@ impl ScanInput { .inverted_index_appliers(self.inverted_index_appliers.clone()) .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone()) .fulltext_index_appliers(self.fulltext_index_appliers.clone()); + let reader = if !self.compaction && may_build_selective_row_selection { + reader.deferred_optional_page_index() + } else { + reader + }; #[cfg(feature = "vector_index")] let reader = { let mut reader = reader; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 26d5cb7b61..53cb51e861 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -156,6 +156,7 @@ pub struct ParquetReaderBuilder { /// Whether to decode primary key values eagerly when reading primary key format SSTs. decode_primary_key_values: bool, page_index_policy: PageIndexPolicy, + defer_optional_page_index: bool, } impl ParquetReaderBuilder { @@ -186,6 +187,7 @@ impl ParquetReaderBuilder { pre_filter_mode: PreFilterMode::All, decode_primary_key_values: false, page_index_policy: Default::default(), + defer_optional_page_index: false, } } @@ -289,6 +291,14 @@ impl ParquetReaderBuilder { self } + /// Defers loading optional page indexes until row-level selections can use them. + #[must_use] + pub(crate) fn deferred_optional_page_index(mut self) -> Self { + self.page_index_policy = PageIndexPolicy::Optional; + self.defer_optional_page_index = true; + self + } + /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. @@ -337,15 +347,22 @@ impl ParquetReaderBuilder { let file_size = self.file_handle.meta_ref().file_size; // Loads parquet metadata of the file. - let (sst_meta, cache_miss) = self + let initial_page_index_policy = if self.defer_optional_page_index + && self.page_index_policy == PageIndexPolicy::Optional + { + PageIndexPolicy::Skip + } else { + self.page_index_policy + }; + let (sst_meta, mut cache_miss) = self .read_parquet_metadata( &file_path, file_size, &mut metrics.metadata_cache_metrics, - self.page_index_policy, + initial_page_index_policy, ) .await?; - let parquet_meta = sst_meta.parquet_metadata(); + let mut parquet_meta = sst_meta.parquet_metadata(); let region_meta = sst_meta.region_metadata(); let region_partition_expr_str = self .expected_metadata @@ -415,38 +432,12 @@ impl ParquetReaderBuilder { return Ok(None); } - // Trigger background download if metadata had a cache miss and selection is not empty - if cache_miss && !selection.is_empty() { - use crate::cache::file_cache::{FileType, IndexKey}; - let index_key = IndexKey::new( - self.file_handle.region_id(), - self.file_handle.file_id().file_id(), - FileType::Parquet, - ); - self.cache_strategy.maybe_download_background( - index_key, - file_path.clone(), - self.object_store.clone(), - file_size, - ); - } - let prune_schema = self .expected_metadata .as_ref() .map(|meta| meta.schema.clone()) .unwrap_or_else(|| region_meta.schema.clone()); - // Create ArrowReaderMetadata for async stream building. - let mut arrow_reader_options = ArrowReaderOptions::new(); - if !read_format.arrow_schema().has_json_extension_field() { - arrow_reader_options = - arrow_reader_options.with_schema(read_format.arrow_schema().clone()); - } - let arrow_metadata = - ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options) - .context(ReadDataPartSnafu)?; - let dyn_filters = if let Some(predicate) = &self.predicate { predicate.dyn_filters().as_ref().clone() } else { @@ -464,6 +455,49 @@ impl ParquetReaderBuilder { &codec, ); + if self.defer_optional_page_index + && self.page_index_policy == PageIndexPolicy::Optional + && (filter_plan.prefilter_builder.is_some() + || has_row_level_selection(&selection, &parquet_meta)) + { + let (sst_meta, page_index_cache_miss) = self + .read_parquet_metadata( + &file_path, + file_size, + &mut metrics.metadata_cache_metrics, + PageIndexPolicy::Optional, + ) + .await?; + parquet_meta = sst_meta.parquet_metadata(); + cache_miss |= page_index_cache_miss; + } + + // Trigger background download if metadata had a cache miss and selection is not empty + if cache_miss && !selection.is_empty() { + use crate::cache::file_cache::{FileType, IndexKey}; + let index_key = IndexKey::new( + self.file_handle.region_id(), + self.file_handle.file_id().file_id(), + FileType::Parquet, + ); + self.cache_strategy.maybe_download_background( + index_key, + file_path.clone(), + self.object_store.clone(), + file_size, + ); + } + + // Create ArrowReaderMetadata for async stream building. + let mut arrow_reader_options = ArrowReaderOptions::new(); + if !read_format.arrow_schema().has_json_extension_field() { + arrow_reader_options = + arrow_reader_options.with_schema(read_format.arrow_schema().clone()); + } + let arrow_metadata = + ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options) + .context(ReadDataPartSnafu)?; + let output_schema = read_format.output_arrow_schema()?; let reader_builder = RowGroupReaderBuilder { @@ -614,7 +648,12 @@ impl ParquetReaderBuilder { metadata_loader.with_page_index_policy(page_index_policy); let metadata = metadata_loader.load(cache_metrics).await?; - let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?); + let metadata = Arc::new(CachedSstMeta::try_new_with_page_index_policy( + file_path, + metadata, + None, + page_index_policy, + )?); // Cache the metadata. self.cache_strategy .put_sst_meta_data(file_id, metadata.clone()); @@ -1210,6 +1249,18 @@ impl ParquetReaderBuilder { } } } + +fn has_row_level_selection(selection: &RowGroupSelection, parquet_meta: &ParquetMetaData) -> bool { + selection.iter().any(|(row_group_idx, row_selection)| { + let Some(row_group) = parquet_meta.row_groups().get(*row_group_idx) else { + return false; + }; + + row_selection.row_count() != row_group.num_rows() as usize + || row_selection.iter().any(|selector| selector.skip) + }) +} + fn apply_selection_and_update_metrics( output: &mut RowGroupSelection, result: &RowGroupSelection, @@ -2171,6 +2222,7 @@ mod tests { use datatypes::schema::ColumnSchema; use object_store::services::Memory; use parquet::arrow::ArrowWriter; + use parquet::file::properties::WriterProperties; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::region_request::PathType; use store_api::storage::RegionId; @@ -2276,6 +2328,39 @@ mod tests { assert!(!selection.is_empty()); } + #[tokio::test(flavor = "current_thread")] + async fn test_has_row_level_selection() { + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let file_path = "row_level_selection.parquet"; + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef; + let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + let mut parquet_bytes = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let file_size = parquet_bytes.len() as u64; + object_store.write(file_path, parquet_bytes).await.unwrap(); + + let mut cache_metrics = MetadataCacheMetrics::default(); + let loader = MetadataLoader::new(object_store, file_path, file_size); + let parquet_meta = loader.load(&mut cache_metrics).await.unwrap(); + assert_eq!(2, parquet_meta.num_row_groups()); + + let full_row_groups = RowGroupSelection::from_full_row_group_ids([0, 1], 3, 5); + assert!(!has_row_level_selection(&full_row_groups, &parquet_meta)); + + let prefix_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![0..1, 1..2])], 3); + assert!(has_row_level_selection(&prefix_selection, &parquet_meta)); + + let interior_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![1..2, 2..3])], 3); + assert!(has_row_level_selection(&interior_selection, &parquet_meta)); + } + fn expected_metadata_with_reused_tag_name( old_metadata: &RegionMetadata, ) -> Arc {