feat: support skipping fields in prune reader

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-10-28 17:34:36 +08:00
parent 946bd1b813
commit 9c800af75d
4 changed files with 96 additions and 18 deletions

View File

@@ -191,7 +191,10 @@ fn apply_combined_filters(
// First, apply predicate filters using the shared method.
if !context.base.filters.is_empty() {
let predicate_mask = context.base.compute_filter_mask_flat(&record_batch)?;
// BulkIterContext always uses PreFilterMode::All, so skip_fields should be false
let predicate_mask = context
.base
.compute_filter_mask_flat(&record_batch, false)?;
// If predicate filters out the entire batch, return None early
let Some(mask) = predicate_mask else {
return Ok(None);

View File

@@ -49,33 +49,40 @@ pub struct PruneReader {
context: FileRangeContextRef,
source: Source,
metrics: ReaderMetrics,
/// Whether to skip field filters for this row group.
skip_fields: bool,
}
impl PruneReader {
pub(crate) fn new_with_row_group_reader(
ctx: FileRangeContextRef,
reader: RowGroupReader,
skip_fields: bool,
) -> Self {
Self {
context: ctx,
source: Source::RowGroup(reader),
metrics: Default::default(),
skip_fields,
}
}
pub(crate) fn new_with_last_row_reader(
ctx: FileRangeContextRef,
reader: RowGroupLastRowCachedReader,
skip_fields: bool,
) -> Self {
Self {
context: ctx,
source: Source::LastRow(reader),
metrics: Default::default(),
skip_fields,
}
}
pub(crate) fn reset_source(&mut self, source: Source) {
pub(crate) fn reset_source(&mut self, source: Source, skip_fields: bool) {
self.source = source;
self.skip_fields = skip_fields;
}
/// Merge metrics with the inner reader and return the merged metrics.
@@ -117,7 +124,7 @@ impl PruneReader {
}
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else {
// the entire batch is filtered out
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
return Ok(None);
@@ -257,17 +264,21 @@ pub struct FlatPruneReader {
context: FileRangeContextRef,
source: FlatSource,
metrics: ReaderMetrics,
/// Whether to skip field filters for this row group.
skip_fields: bool,
}
impl FlatPruneReader {
pub(crate) fn new_with_row_group_reader(
ctx: FileRangeContextRef,
reader: FlatRowGroupReader,
skip_fields: bool,
) -> Self {
Self {
context: ctx,
source: FlatSource::RowGroup(reader),
metrics: Default::default(),
skip_fields,
}
}
@@ -309,7 +320,10 @@ impl FlatPruneReader {
}
let num_rows_before_filter = record_batch.num_rows();
let Some(filtered_batch) = self.context.precise_filter_flat(record_batch)? else {
let Some(filtered_batch) = self
.context
.precise_filter_flat(record_batch, self.skip_fields)?
else {
// the entire batch is filtered out
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
return Ok(None);

View File

@@ -143,6 +143,9 @@ impl FileRange {
false
};
// Compute skip_fields once for this row group
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
let prune_reader = if use_last_row_reader {
// Row group is PUT only, use LastRowReader to skip unnecessary rows.
let reader = RowGroupLastRowCachedReader::new(
@@ -151,12 +154,13 @@ impl FileRange {
self.context.reader_builder.cache_strategy().clone(),
RowGroupReader::new(self.context.clone(), parquet_reader),
);
PruneReader::new_with_last_row_reader(self.context.clone(), reader)
PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
} else {
// Row group contains DELETE, fallback to default reader.
PruneReader::new_with_row_group_reader(
self.context.clone(),
RowGroupReader::new(self.context.clone(), parquet_reader),
skip_fields,
)
};
@@ -171,9 +175,15 @@ impl FileRange {
.build(self.row_group_idx, self.row_selection.clone())
.await?;
// Compute skip_fields once for this row group
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
let flat_prune_reader =
FlatPruneReader::new_with_row_group_reader(self.context.clone(), flat_row_group_reader);
let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
flat_row_group_reader,
skip_fields,
);
Ok(flat_prune_reader)
}
@@ -252,13 +262,29 @@ impl FileRangeContext {
/// TRY THE BEST to perform pushed down predicate precisely on the input batch.
/// Return the filtered batch. If the entire batch is filtered out, return None.
pub(crate) fn precise_filter(&self, input: Batch) -> Result<Option<Batch>> {
self.base.precise_filter(input)
pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
self.base.precise_filter(input, skip_fields)
}
/// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result<Option<RecordBatch>> {
self.base.precise_filter_flat(input)
pub(crate) fn precise_filter_flat(
&self,
input: RecordBatch,
skip_fields: bool,
) -> Result<Option<RecordBatch>> {
self.base.precise_filter_flat(input, skip_fields)
}
/// Determines whether to skip field filters based on PreFilterMode and row group delete status.
pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
match self.base.pre_filter_mode {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => {
// Check if this specific row group contains delete op
self.contains_delete(row_group_idx).unwrap_or(false)
}
}
}
//// Decodes parquet metadata and finds if row group contains delete op.
@@ -302,7 +328,15 @@ impl RangeBase {
///
/// When a filter is referencing primary key column, this method will decode
/// the primary key and put it into the batch.
pub(crate) fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
///
/// # Arguments
/// * `input` - The batch to filter
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
pub(crate) fn precise_filter(
&self,
mut input: Batch,
skip_fields: bool,
) -> Result<Option<Batch>> {
let mut mask = BooleanBuffer::new_set(input.num_rows());
// Run filter one by one and combine them result
@@ -357,6 +391,10 @@ impl RangeBase {
}
}
SemanticType::Field => {
// Skip field filters if skip_fields is true
if skip_fields {
continue;
}
// Safety: Input is Batch so we are using primary key format.
let Some(field_index) = self
.read_format
@@ -387,8 +425,16 @@ impl RangeBase {
/// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
///
/// It assumes all necessary tags are already decoded from the primary key.
pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result<Option<RecordBatch>> {
let mask = self.compute_filter_mask_flat(&input)?;
///
/// # Arguments
/// * `input` - The RecordBatch to filter
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
pub(crate) fn precise_filter_flat(
&self,
input: RecordBatch,
skip_fields: bool,
) -> Result<Option<RecordBatch>> {
let mask = self.compute_filter_mask_flat(&input, skip_fields)?;
// If mask is None, the entire batch is filtered out
let Some(mask) = mask else {
@@ -409,9 +455,14 @@ impl RangeBase {
/// Computes the filter mask for the input RecordBatch based on pushed down predicates.
///
/// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
///
/// # Arguments
/// * `input` - The RecordBatch to compute mask for
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
pub(crate) fn compute_filter_mask_flat(
&self,
input: &RecordBatch,
skip_fields: bool,
) -> Result<Option<BooleanBuffer>> {
let mut mask = BooleanBuffer::new_set(input.num_rows());
@@ -437,6 +488,11 @@ impl RangeBase {
MaybeFilter::Pruned => return Ok(None),
};
// Skip field filters if skip_fields is true
if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
continue;
}
// Get the column directly by its projected index
let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
if let Some(idx) = column_idx {

View File

@@ -1251,10 +1251,12 @@ impl BatchReader for ParquetReader {
.await?;
// Resets the parquet reader.
reader.reset_source(Source::RowGroup(RowGroupReader::new(
self.context.clone(),
parquet_reader,
)));
// Compute skip_fields for this row group
let skip_fields = self.context.should_skip_fields(row_group_idx);
reader.reset_source(
Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
skip_fields,
);
if let Some(batch) = reader.next_batch().await? {
return Ok(Some(batch));
}
@@ -1307,9 +1309,12 @@ impl ParquetReader {
.reader_builder()
.build(row_group_idx, Some(row_selection))
.await?;
// Compute skip_fields once for this row group
let skip_fields = context.should_skip_fields(row_group_idx);
ReaderState::Readable(PruneReader::new_with_row_group_reader(
context.clone(),
RowGroupReader::new(context.clone(), parquet_reader),
skip_fields,
))
} else {
ReaderState::Exhausted(ReaderMetrics::default())