diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 5578018a8d..198573fc25 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -191,7 +191,10 @@ fn apply_combined_filters( // First, apply predicate filters using the shared method. if !context.base.filters.is_empty() { - let predicate_mask = context.base.compute_filter_mask_flat(&record_batch)?; + // BulkIterContext always uses PreFilterMode::All, so skip_fields should be false + let predicate_mask = context + .base + .compute_filter_mask_flat(&record_batch, false)?; // If predicate filters out the entire batch, return None early let Some(mask) = predicate_mask else { return Ok(None); diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 413f787980..22cc9fb3ba 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -49,33 +49,40 @@ pub struct PruneReader { context: FileRangeContextRef, source: Source, metrics: ReaderMetrics, + /// Whether to skip field filters for this row group. + skip_fields: bool, } impl PruneReader { pub(crate) fn new_with_row_group_reader( ctx: FileRangeContextRef, reader: RowGroupReader, + skip_fields: bool, ) -> Self { Self { context: ctx, source: Source::RowGroup(reader), metrics: Default::default(), + skip_fields, } } pub(crate) fn new_with_last_row_reader( ctx: FileRangeContextRef, reader: RowGroupLastRowCachedReader, + skip_fields: bool, ) -> Self { Self { context: ctx, source: Source::LastRow(reader), metrics: Default::default(), + skip_fields, } } - pub(crate) fn reset_source(&mut self, source: Source) { + pub(crate) fn reset_source(&mut self, source: Source, skip_fields: bool) { self.source = source; + self.skip_fields = skip_fields; } /// Merge metrics with the inner reader and return the merged metrics. @@ -117,7 +124,7 @@ impl PruneReader { } let num_rows_before_filter = batch.num_rows(); - let Some(batch_filtered) = self.context.precise_filter(batch)? else { + let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else { // the entire batch is filtered out self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter; return Ok(None); @@ -257,17 +264,21 @@ pub struct FlatPruneReader { context: FileRangeContextRef, source: FlatSource, metrics: ReaderMetrics, + /// Whether to skip field filters for this row group. + skip_fields: bool, } impl FlatPruneReader { pub(crate) fn new_with_row_group_reader( ctx: FileRangeContextRef, reader: FlatRowGroupReader, + skip_fields: bool, ) -> Self { Self { context: ctx, source: FlatSource::RowGroup(reader), metrics: Default::default(), + skip_fields, } } @@ -309,7 +320,10 @@ impl FlatPruneReader { } let num_rows_before_filter = record_batch.num_rows(); - let Some(filtered_batch) = self.context.precise_filter_flat(record_batch)? else { + let Some(filtered_batch) = self + .context + .precise_filter_flat(record_batch, self.skip_fields)? + else { // the entire batch is filtered out self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter; return Ok(None); diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 97697517b6..6003ee6f87 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -143,6 +143,9 @@ impl FileRange { false }; + // Compute skip_fields once for this row group + let skip_fields = self.context.should_skip_fields(self.row_group_idx); + let prune_reader = if use_last_row_reader { // Row group is PUT only, use LastRowReader to skip unnecessary rows. let reader = RowGroupLastRowCachedReader::new( @@ -151,12 +154,13 @@ impl FileRange { self.context.reader_builder.cache_strategy().clone(), RowGroupReader::new(self.context.clone(), parquet_reader), ); - PruneReader::new_with_last_row_reader(self.context.clone(), reader) + PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields) } else { // Row group contains DELETE, fallback to default reader. PruneReader::new_with_row_group_reader( self.context.clone(), RowGroupReader::new(self.context.clone(), parquet_reader), + skip_fields, ) }; @@ -171,9 +175,15 @@ impl FileRange { .build(self.row_group_idx, self.row_selection.clone()) .await?; + // Compute skip_fields once for this row group + let skip_fields = self.context.should_skip_fields(self.row_group_idx); + let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader); - let flat_prune_reader = - FlatPruneReader::new_with_row_group_reader(self.context.clone(), flat_row_group_reader); + let flat_prune_reader = FlatPruneReader::new_with_row_group_reader( + self.context.clone(), + flat_row_group_reader, + skip_fields, + ); Ok(flat_prune_reader) } @@ -252,13 +262,29 @@ impl FileRangeContext { /// TRY THE BEST to perform pushed down predicate precisely on the input batch. /// Return the filtered batch. If the entire batch is filtered out, return None. - pub(crate) fn precise_filter(&self, input: Batch) -> Result> { - self.base.precise_filter(input) + pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result> { + self.base.precise_filter(input, skip_fields) } /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. - pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result> { - self.base.precise_filter_flat(input) + pub(crate) fn precise_filter_flat( + &self, + input: RecordBatch, + skip_fields: bool, + ) -> Result> { + self.base.precise_filter_flat(input, skip_fields) + } + + /// Determines whether to skip field filters based on PreFilterMode and row group delete status. + pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool { + match self.base.pre_filter_mode { + PreFilterMode::All => false, + PreFilterMode::SkipFields => true, + PreFilterMode::SkipFieldsOnDelete => { + // Check if this specific row group contains delete op + self.contains_delete(row_group_idx).unwrap_or(false) + } + } } //// Decodes parquet metadata and finds if row group contains delete op. @@ -302,7 +328,15 @@ impl RangeBase { /// /// When a filter is referencing primary key column, this method will decode /// the primary key and put it into the batch. - pub(crate) fn precise_filter(&self, mut input: Batch) -> Result> { + /// + /// # Arguments + /// * `input` - The batch to filter + /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status + pub(crate) fn precise_filter( + &self, + mut input: Batch, + skip_fields: bool, + ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); // Run filter one by one and combine them result @@ -357,6 +391,10 @@ impl RangeBase { } } SemanticType::Field => { + // Skip field filters if skip_fields is true + if skip_fields { + continue; + } // Safety: Input is Batch so we are using primary key format. let Some(field_index) = self .read_format @@ -387,8 +425,16 @@ impl RangeBase { /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. /// /// It assumes all necessary tags are already decoded from the primary key. - pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result> { - let mask = self.compute_filter_mask_flat(&input)?; + /// + /// # Arguments + /// * `input` - The RecordBatch to filter + /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status + pub(crate) fn precise_filter_flat( + &self, + input: RecordBatch, + skip_fields: bool, + ) -> Result> { + let mask = self.compute_filter_mask_flat(&input, skip_fields)?; // If mask is None, the entire batch is filtered out let Some(mask) = mask else { @@ -409,9 +455,14 @@ impl RangeBase { /// Computes the filter mask for the input RecordBatch based on pushed down predicates. /// /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask. + /// + /// # Arguments + /// * `input` - The RecordBatch to compute mask for + /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status pub(crate) fn compute_filter_mask_flat( &self, input: &RecordBatch, + skip_fields: bool, ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); @@ -437,6 +488,11 @@ impl RangeBase { MaybeFilter::Pruned => return Ok(None), }; + // Skip field filters if skip_fields is true + if skip_fields && filter_ctx.semantic_type() == SemanticType::Field { + continue; + } + // Get the column directly by its projected index let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id()); if let Some(idx) = column_idx { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 3f5eb9b687..7e5db08761 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1251,10 +1251,12 @@ impl BatchReader for ParquetReader { .await?; // Resets the parquet reader. - reader.reset_source(Source::RowGroup(RowGroupReader::new( - self.context.clone(), - parquet_reader, - ))); + // Compute skip_fields for this row group + let skip_fields = self.context.should_skip_fields(row_group_idx); + reader.reset_source( + Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)), + skip_fields, + ); if let Some(batch) = reader.next_batch().await? { return Ok(Some(batch)); } @@ -1307,9 +1309,12 @@ impl ParquetReader { .reader_builder() .build(row_group_idx, Some(row_selection)) .await?; + // Compute skip_fields once for this row group + let skip_fields = context.should_skip_fields(row_group_idx); ReaderState::Readable(PruneReader::new_with_row_group_reader( context.clone(), RowGroupReader::new(context.clone(), parquet_reader), + skip_fields, )) } else { ReaderState::Exhausted(ReaderMetrics::default())