feat: support skip filtering fields in RowGroupPruningStats

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-10-28 14:33:24 +08:00
parent a2f8893c9e
commit 53546ad59e
3 changed files with 27 additions and 6 deletions

View File

@@ -83,7 +83,7 @@ impl BulkIterContext {
let region_meta = self.base.read_format.metadata();
let row_groups = file_meta.row_groups();
// expected_metadata is set to None since we always expect region metadata of memtable is up-to-date.
let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None);
let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None, false);
if let Some(predicate) = self.predicate.as_ref() {
predicate
.prune_with_stats(&stats, region_meta.schema.arrow_schema())

View File

@@ -487,7 +487,9 @@ impl ParquetReaderBuilder {
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
.await;
let selection = match apply_res {
Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
Ok(Some(res)) => {
RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
}
Ok(None) => continue,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
@@ -614,7 +616,9 @@ impl ParquetReaderBuilder {
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
.await;
let mut selection = match apply_res {
Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
Ok(apply_output) => {
RowGroupSelection::from_row_ranges(apply_output, row_group_size)
}
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
continue;
@@ -727,8 +731,12 @@ impl ParquetReaderBuilder {
let region_meta = read_format.metadata();
let row_groups = parquet_meta.row_groups();
let stats =
RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
let stats = RowGroupPruningStats::new(
row_groups,
read_format,
self.expected_metadata.clone(),
false,
);
let prune_schema = self
.expected_metadata
.as_ref()

View File

@@ -18,6 +18,7 @@ use std::borrow::Borrow;
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::SemanticType;
use datafusion_common::pruning::PruningStatistics;
use datafusion_common::{Column, ScalarValue};
use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
@@ -38,6 +39,8 @@ pub(crate) struct RowGroupPruningStats<'a, T> {
/// of the metadata in the SST to get the column id of a column as the SST may have
/// different columns.
expected_metadata: Option<RegionMetadataRef>,
/// If true, skip columns with Field semantic type during pruning.
skip_fields: bool,
}
impl<'a, T> RowGroupPruningStats<'a, T> {
@@ -46,22 +49,32 @@ impl<'a, T> RowGroupPruningStats<'a, T> {
row_groups: &'a [T],
read_format: &'a ReadFormat,
expected_metadata: Option<RegionMetadataRef>,
skip_fields: bool,
) -> Self {
Self {
row_groups,
read_format,
expected_metadata,
skip_fields,
}
}
/// Returns the column id of specific column name if we need to read it.
/// Prefers the column id in the expected metadata if it exists.
/// Returns None if skip_fields is true and the column is a Field.
fn column_id_to_prune(&self, name: &str) -> Option<ColumnId> {
let metadata = self
.expected_metadata
.as_ref()
.unwrap_or_else(|| self.read_format.metadata());
metadata.column_by_name(name).map(|col| col.column_id)
let col = metadata.column_by_name(name)?;
// Skip field columns when skip_fields is enabled
if self.skip_fields && col.semantic_type == SemanticType::Field {
return None;
}
Some(col.column_id)
}
/// Returns the default value of all row groups for `column` according to the metadata.