load page index when necessary

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-19 16:39:32 +08:00
parent db68aea7c2
commit fd247b6bde
2 changed files with 110 additions and 31 deletions

View File

@@ -37,7 +37,6 @@ use datatypes::schema::Schema;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type::JsonNativeType;
use futures::StreamExt;
use parquet::file::metadata::PageIndexPolicy;
use partition::expr::PartitionExpr;
use smallvec::SmallVec;
use snafu::ResultExt;
@@ -1098,7 +1097,7 @@ impl ScanInput {
.bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
.fulltext_index_appliers(self.fulltext_index_appliers.clone());
let reader = if !self.compaction && may_build_selective_row_selection {
reader.page_index_policy(PageIndexPolicy::Optional)
reader.deferred_optional_page_index()
} else {
reader
};

View File

@@ -156,6 +156,7 @@ pub struct ParquetReaderBuilder {
/// Whether to decode primary key values eagerly when reading primary key format SSTs.
decode_primary_key_values: bool,
page_index_policy: PageIndexPolicy,
defer_optional_page_index: bool,
}
impl ParquetReaderBuilder {
@@ -186,6 +187,7 @@ impl ParquetReaderBuilder {
pre_filter_mode: PreFilterMode::All,
decode_primary_key_values: false,
page_index_policy: Default::default(),
defer_optional_page_index: false,
}
}
@@ -289,6 +291,14 @@ impl ParquetReaderBuilder {
self
}
/// Defers loading optional page indexes until row-level selections can use them.
#[must_use]
pub(crate) fn deferred_optional_page_index(mut self) -> Self {
self.page_index_policy = PageIndexPolicy::Optional;
self.defer_optional_page_index = true;
self
}
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -337,15 +347,22 @@ impl ParquetReaderBuilder {
let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file.
let (sst_meta, cache_miss) = self
let initial_page_index_policy = if self.defer_optional_page_index
&& self.page_index_policy == PageIndexPolicy::Optional
{
PageIndexPolicy::Skip
} else {
self.page_index_policy
};
let (sst_meta, mut cache_miss) = self
.read_parquet_metadata(
&file_path,
file_size,
&mut metrics.metadata_cache_metrics,
self.page_index_policy,
initial_page_index_policy,
)
.await?;
let parquet_meta = sst_meta.parquet_metadata();
let mut parquet_meta = sst_meta.parquet_metadata();
let region_meta = sst_meta.region_metadata();
let region_partition_expr_str = self
.expected_metadata
@@ -415,38 +432,12 @@ impl ParquetReaderBuilder {
return Ok(None);
}
// Trigger background download if metadata had a cache miss and selection is not empty
if cache_miss && !selection.is_empty() {
use crate::cache::file_cache::{FileType, IndexKey};
let index_key = IndexKey::new(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
FileType::Parquet,
);
self.cache_strategy.maybe_download_background(
index_key,
file_path.clone(),
self.object_store.clone(),
file_size,
);
}
let prune_schema = self
.expected_metadata
.as_ref()
.map(|meta| meta.schema.clone())
.unwrap_or_else(|| region_meta.schema.clone());
// Create ArrowReaderMetadata for async stream building.
let mut arrow_reader_options = ArrowReaderOptions::new();
if !read_format.arrow_schema().has_json_extension_field() {
arrow_reader_options =
arrow_reader_options.with_schema(read_format.arrow_schema().clone());
}
let arrow_metadata =
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
.context(ReadDataPartSnafu)?;
let dyn_filters = if let Some(predicate) = &self.predicate {
predicate.dyn_filters().as_ref().clone()
} else {
@@ -464,6 +455,49 @@ impl ParquetReaderBuilder {
&codec,
);
if self.defer_optional_page_index
&& self.page_index_policy == PageIndexPolicy::Optional
&& (filter_plan.prefilter_builder.is_some()
|| has_row_level_selection(&selection, &parquet_meta))
{
let (sst_meta, page_index_cache_miss) = self
.read_parquet_metadata(
&file_path,
file_size,
&mut metrics.metadata_cache_metrics,
PageIndexPolicy::Optional,
)
.await?;
parquet_meta = sst_meta.parquet_metadata();
cache_miss |= page_index_cache_miss;
}
// Trigger background download if metadata had a cache miss and selection is not empty
if cache_miss && !selection.is_empty() {
use crate::cache::file_cache::{FileType, IndexKey};
let index_key = IndexKey::new(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
FileType::Parquet,
);
self.cache_strategy.maybe_download_background(
index_key,
file_path.clone(),
self.object_store.clone(),
file_size,
);
}
// Create ArrowReaderMetadata for async stream building.
let mut arrow_reader_options = ArrowReaderOptions::new();
if !read_format.arrow_schema().has_json_extension_field() {
arrow_reader_options =
arrow_reader_options.with_schema(read_format.arrow_schema().clone());
}
let arrow_metadata =
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
.context(ReadDataPartSnafu)?;
let output_schema = read_format.output_arrow_schema()?;
let reader_builder = RowGroupReaderBuilder {
@@ -1215,6 +1249,18 @@ impl ParquetReaderBuilder {
}
}
}
fn has_row_level_selection(selection: &RowGroupSelection, parquet_meta: &ParquetMetaData) -> bool {
selection.iter().any(|(row_group_idx, row_selection)| {
let Some(row_group) = parquet_meta.row_groups().get(*row_group_idx) else {
return false;
};
row_selection.row_count() != row_group.num_rows() as usize
|| row_selection.iter().any(|selector| selector.skip)
})
}
fn apply_selection_and_update_metrics(
output: &mut RowGroupSelection,
result: &RowGroupSelection,
@@ -2176,6 +2222,7 @@ mod tests {
use datatypes::schema::ColumnSchema;
use object_store::services::Memory;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_request::PathType;
use store_api::storage::RegionId;
@@ -2281,6 +2328,39 @@ mod tests {
assert!(!selection.is_empty());
}
#[tokio::test(flavor = "current_thread")]
async fn test_has_row_level_selection() {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_path = "row_level_selection.parquet";
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef;
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(3))
.build();
let mut parquet_bytes = Vec::new();
let mut writer =
ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let file_size = parquet_bytes.len() as u64;
object_store.write(file_path, parquet_bytes).await.unwrap();
let mut cache_metrics = MetadataCacheMetrics::default();
let loader = MetadataLoader::new(object_store, file_path, file_size);
let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
assert_eq!(2, parquet_meta.num_row_groups());
let full_row_groups = RowGroupSelection::from_full_row_group_ids([0, 1], 3, 5);
assert!(!has_row_level_selection(&full_row_groups, &parquet_meta));
let prefix_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![0..1, 1..2])], 3);
assert!(has_row_level_selection(&prefix_selection, &parquet_meta));
let interior_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![1..2, 2..3])], 3);
assert!(has_row_level_selection(&interior_selection, &parquet_meta));
}
fn expected_metadata_with_reused_tag_name(
old_metadata: &RegionMetadata,
) -> Arc<RegionMetadata> {