diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 84d725a257..2f375ed0f2 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -37,7 +37,6 @@ use datatypes::schema::Schema; use datatypes::schema::ext::ArrowSchemaExt; use datatypes::types::json_type::JsonNativeType; use futures::StreamExt; -use parquet::file::metadata::PageIndexPolicy; use partition::expr::PartitionExpr; use smallvec::SmallVec; use snafu::ResultExt; @@ -1098,7 +1097,7 @@ impl ScanInput { .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone()) .fulltext_index_appliers(self.fulltext_index_appliers.clone()); let reader = if !self.compaction && may_build_selective_row_selection { - reader.page_index_policy(PageIndexPolicy::Optional) + reader.deferred_optional_page_index() } else { reader }; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 2128c52186..53cb51e861 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -156,6 +156,7 @@ pub struct ParquetReaderBuilder { /// Whether to decode primary key values eagerly when reading primary key format SSTs. decode_primary_key_values: bool, page_index_policy: PageIndexPolicy, + defer_optional_page_index: bool, } impl ParquetReaderBuilder { @@ -186,6 +187,7 @@ impl ParquetReaderBuilder { pre_filter_mode: PreFilterMode::All, decode_primary_key_values: false, page_index_policy: Default::default(), + defer_optional_page_index: false, } } @@ -289,6 +291,14 @@ impl ParquetReaderBuilder { self } + /// Defers loading optional page indexes until row-level selections can use them. + #[must_use] + pub(crate) fn deferred_optional_page_index(mut self) -> Self { + self.page_index_policy = PageIndexPolicy::Optional; + self.defer_optional_page_index = true; + self + } + /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. @@ -337,15 +347,22 @@ impl ParquetReaderBuilder { let file_size = self.file_handle.meta_ref().file_size; // Loads parquet metadata of the file. - let (sst_meta, cache_miss) = self + let initial_page_index_policy = if self.defer_optional_page_index + && self.page_index_policy == PageIndexPolicy::Optional + { + PageIndexPolicy::Skip + } else { + self.page_index_policy + }; + let (sst_meta, mut cache_miss) = self .read_parquet_metadata( &file_path, file_size, &mut metrics.metadata_cache_metrics, - self.page_index_policy, + initial_page_index_policy, ) .await?; - let parquet_meta = sst_meta.parquet_metadata(); + let mut parquet_meta = sst_meta.parquet_metadata(); let region_meta = sst_meta.region_metadata(); let region_partition_expr_str = self .expected_metadata @@ -415,38 +432,12 @@ impl ParquetReaderBuilder { return Ok(None); } - // Trigger background download if metadata had a cache miss and selection is not empty - if cache_miss && !selection.is_empty() { - use crate::cache::file_cache::{FileType, IndexKey}; - let index_key = IndexKey::new( - self.file_handle.region_id(), - self.file_handle.file_id().file_id(), - FileType::Parquet, - ); - self.cache_strategy.maybe_download_background( - index_key, - file_path.clone(), - self.object_store.clone(), - file_size, - ); - } - let prune_schema = self .expected_metadata .as_ref() .map(|meta| meta.schema.clone()) .unwrap_or_else(|| region_meta.schema.clone()); - // Create ArrowReaderMetadata for async stream building. - let mut arrow_reader_options = ArrowReaderOptions::new(); - if !read_format.arrow_schema().has_json_extension_field() { - arrow_reader_options = - arrow_reader_options.with_schema(read_format.arrow_schema().clone()); - } - let arrow_metadata = - ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options) - .context(ReadDataPartSnafu)?; - let dyn_filters = if let Some(predicate) = &self.predicate { predicate.dyn_filters().as_ref().clone() } else { @@ -464,6 +455,49 @@ impl ParquetReaderBuilder { &codec, ); + if self.defer_optional_page_index + && self.page_index_policy == PageIndexPolicy::Optional + && (filter_plan.prefilter_builder.is_some() + || has_row_level_selection(&selection, &parquet_meta)) + { + let (sst_meta, page_index_cache_miss) = self + .read_parquet_metadata( + &file_path, + file_size, + &mut metrics.metadata_cache_metrics, + PageIndexPolicy::Optional, + ) + .await?; + parquet_meta = sst_meta.parquet_metadata(); + cache_miss |= page_index_cache_miss; + } + + // Trigger background download if metadata had a cache miss and selection is not empty + if cache_miss && !selection.is_empty() { + use crate::cache::file_cache::{FileType, IndexKey}; + let index_key = IndexKey::new( + self.file_handle.region_id(), + self.file_handle.file_id().file_id(), + FileType::Parquet, + ); + self.cache_strategy.maybe_download_background( + index_key, + file_path.clone(), + self.object_store.clone(), + file_size, + ); + } + + // Create ArrowReaderMetadata for async stream building. + let mut arrow_reader_options = ArrowReaderOptions::new(); + if !read_format.arrow_schema().has_json_extension_field() { + arrow_reader_options = + arrow_reader_options.with_schema(read_format.arrow_schema().clone()); + } + let arrow_metadata = + ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options) + .context(ReadDataPartSnafu)?; + let output_schema = read_format.output_arrow_schema()?; let reader_builder = RowGroupReaderBuilder { @@ -1215,6 +1249,18 @@ impl ParquetReaderBuilder { } } } + +fn has_row_level_selection(selection: &RowGroupSelection, parquet_meta: &ParquetMetaData) -> bool { + selection.iter().any(|(row_group_idx, row_selection)| { + let Some(row_group) = parquet_meta.row_groups().get(*row_group_idx) else { + return false; + }; + + row_selection.row_count() != row_group.num_rows() as usize + || row_selection.iter().any(|selector| selector.skip) + }) +} + fn apply_selection_and_update_metrics( output: &mut RowGroupSelection, result: &RowGroupSelection, @@ -2176,6 +2222,7 @@ mod tests { use datatypes::schema::ColumnSchema; use object_store::services::Memory; use parquet::arrow::ArrowWriter; + use parquet::file::properties::WriterProperties; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::region_request::PathType; use store_api::storage::RegionId; @@ -2281,6 +2328,39 @@ mod tests { assert!(!selection.is_empty()); } + #[tokio::test(flavor = "current_thread")] + async fn test_has_row_level_selection() { + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let file_path = "row_level_selection.parquet"; + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef; + let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + let mut parquet_bytes = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let file_size = parquet_bytes.len() as u64; + object_store.write(file_path, parquet_bytes).await.unwrap(); + + let mut cache_metrics = MetadataCacheMetrics::default(); + let loader = MetadataLoader::new(object_store, file_path, file_size); + let parquet_meta = loader.load(&mut cache_metrics).await.unwrap(); + assert_eq!(2, parquet_meta.num_row_groups()); + + let full_row_groups = RowGroupSelection::from_full_row_group_ids([0, 1], 3, 5); + assert!(!has_row_level_selection(&full_row_groups, &parquet_meta)); + + let prefix_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![0..1, 1..2])], 3); + assert!(has_row_level_selection(&prefix_selection, &parquet_meta)); + + let interior_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![1..2, 2..3])], 3); + assert!(has_row_level_selection(&interior_selection, &parquet_meta)); + } + fn expected_metadata_with_reused_tag_name( old_metadata: &RegionMetadata, ) -> Arc {