diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index e4f8c47672..473b5e606f 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -43,6 +43,22 @@ impl BulkIterContext { projection: Option<&[ColumnId]>, predicate: Option, skip_auto_convert: bool, + ) -> Result { + Self::new_with_pre_filter_mode( + region_metadata, + projection, + predicate, + skip_auto_convert, + PreFilterMode::All, + ) + } + + pub fn new_with_pre_filter_mode( + region_metadata: RegionMetadataRef, + projection: Option<&[ColumnId]>, + predicate: Option, + skip_auto_convert: bool, + pre_filter_mode: PreFilterMode, ) -> Result { let codec = build_primary_key_codec(®ion_metadata); @@ -73,18 +89,23 @@ impl BulkIterContext { codec, // we don't need to compat batch since all batch in memtable have the same schema. compat_batch: None, - pre_filter_mode: PreFilterMode::All, + pre_filter_mode, }, predicate, }) } /// Prunes row groups by stats. - pub(crate) fn row_groups_to_read(&self, file_meta: &Arc) -> VecDeque { + pub(crate) fn row_groups_to_read( + &self, + file_meta: &Arc, + skip_fields: bool, + ) -> VecDeque { let region_meta = self.base.read_format.metadata(); let row_groups = file_meta.row_groups(); // expected_metadata is set to None since we always expect region metadata of memtable is up-to-date. - let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None, false); + let stats = + RowGroupPruningStats::new(row_groups, &self.base.read_format, None, skip_fields); if let Some(predicate) = self.predicate.as_ref() { predicate .prune_with_stats(&stats, region_meta.schema.arrow_schema()) @@ -105,4 +126,9 @@ impl BulkIterContext { pub(crate) fn read_format(&self) -> &ReadFormat { &self.base.read_format } + + /// Returns the pre-filter mode. + pub(crate) fn pre_filter_mode(&self) -> PreFilterMode { + self.base.pre_filter_mode + } } diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 21ac141cff..bf231ae112 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -65,6 +65,7 @@ use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::EncodedBulkPartIter; use crate::memtable::time_series::{ValueBuilder, Values}; use crate::sst::index::IndexOutput; +use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete}; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; @@ -572,8 +573,13 @@ impl EncodedBulkPart { context: BulkIterContextRef, sequence: Option, ) -> Result> { + // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs. + let skip_fields_for_pruning = + Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata); + // use predicate to find row groups to read. - let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata); + let row_groups_to_read = + context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning); if row_groups_to_read.is_empty() { // All row groups are filtered. @@ -589,6 +595,20 @@ impl EncodedBulkPart { )?; Ok(Some(Box::new(iter) as BoxedRecordBatchIterator)) } + + /// Computes whether to skip field columns based on PreFilterMode. + fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool { + match pre_filter_mode { + PreFilterMode::All => false, + PreFilterMode::SkipFields => true, + PreFilterMode::SkipFieldsOnDelete => { + // Check if any row group contains delete op + (0..parquet_meta.num_row_groups()).any(|rg_idx| { + row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true) + }) + } + } + } } #[derive(Debug, Clone)] diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 198573fc25..94303a0cca 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -27,6 +27,7 @@ use store_api::storage::SequenceRange; use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu}; use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef}; use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder; +use crate::sst::parquet::file_range::PreFilterMode; use crate::sst::parquet::flat_format::sequence_column_index; use crate::sst::parquet::reader::RowGroupReaderContext; @@ -38,6 +39,8 @@ pub struct EncodedBulkPartIter { builder: MemtableRowGroupReaderBuilder, /// Sequence number filter. sequence: Option, + /// Cached skip_fields for current row group. + current_skip_fields: bool, } impl EncodedBulkPartIter { @@ -58,16 +61,22 @@ impl EncodedBulkPartIter { let builder = MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?; - let init_reader = row_groups_to_read - .pop_front() - .map(|first_row_group| builder.build_row_group_reader(first_row_group, None)) - .transpose()?; + let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() { + Some(first_row_group) => { + let skip_fields = builder.compute_skip_fields(&context, first_row_group); + let reader = builder.build_row_group_reader(first_row_group, None)?; + (Some(reader), skip_fields) + } + None => (None, false), + }; + Ok(Self { context, row_groups_to_read, current_reader: init_reader, builder, sequence, + current_skip_fields, }) } @@ -80,19 +89,34 @@ impl EncodedBulkPartIter { for batch in current { let batch = batch.context(DecodeArrowRowGroupSnafu)?; - if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? { + if let Some(batch) = apply_combined_filters( + &self.context, + &self.sequence, + batch, + self.current_skip_fields, + )? { return Ok(Some(batch)); } } // Previous row group exhausted, read next row group while let Some(next_row_group) = self.row_groups_to_read.pop_front() { + // Compute skip_fields for this row group + self.current_skip_fields = self + .builder + .compute_skip_fields(&self.context, next_row_group); + let next_reader = self.builder.build_row_group_reader(next_row_group, None)?; let current = self.current_reader.insert(next_reader); for batch in current { let batch = batch.context(DecodeArrowRowGroupSnafu)?; - if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? { + if let Some(batch) = apply_combined_filters( + &self.context, + &self.sequence, + batch, + self.current_skip_fields, + )? { return Ok(Some(batch)); } } @@ -152,8 +176,14 @@ impl BulkPartRecordBatchIter { // Apply projection first. let projected_batch = self.apply_projection(record_batch)?; // Apply combined filtering (both predicate and sequence filters) + // For BulkPartRecordBatchIter, we don't have row group information. + let skip_fields = match self.context.pre_filter_mode() { + PreFilterMode::All => false, + PreFilterMode::SkipFields => true, + PreFilterMode::SkipFieldsOnDelete => true, + }; let Some(filtered_batch) = - apply_combined_filters(&self.context, &self.sequence, projected_batch)? + apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)? else { return Ok(None); }; @@ -181,6 +211,7 @@ fn apply_combined_filters( context: &BulkIterContext, sequence: &Option, record_batch: RecordBatch, + skip_fields: bool, ) -> error::Result> { // Converts the format to the flat format first. let format = context.read_format().as_flat().unwrap(); @@ -191,10 +222,9 @@ fn apply_combined_filters( // First, apply predicate filters using the shared method. if !context.base.filters.is_empty() { - // BulkIterContext always uses PreFilterMode::All, so skip_fields should be false let predicate_mask = context .base - .compute_filter_mask_flat(&record_batch, false)?; + .compute_filter_mask_flat(&record_batch, skip_fields)?; // If predicate filters out the entire batch, return None early let Some(mask) = predicate_mask else { return Ok(None); diff --git a/src/mito2/src/memtable/bulk/row_group_reader.rs b/src/mito2/src/memtable/bulk/row_group_reader.rs index 9918d81871..1e9e5dec4d 100644 --- a/src/mito2/src/memtable/bulk/row_group_reader.rs +++ b/src/mito2/src/memtable/bulk/row_group_reader.rs @@ -169,4 +169,23 @@ impl MemtableRowGroupReaderBuilder { ) .context(ReadDataPartSnafu) } + + /// Computes whether to skip field filters for a specific row group based on PreFilterMode. + pub(crate) fn compute_skip_fields( + &self, + context: &BulkIterContextRef, + row_group_idx: usize, + ) -> bool { + use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete}; + + match context.pre_filter_mode() { + PreFilterMode::All => false, + PreFilterMode::SkipFields => true, + PreFilterMode::SkipFieldsOnDelete => { + // Check if this specific row group contains delete op + row_group_contains_delete(&self.parquet_metadata, row_group_idx, "memtable") + .unwrap_or(true) + } + } + } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 90df3a5f55..1e9b7f74c9 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -68,6 +68,7 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; +use crate::sst::parquet::file_range::PreFilterMode; use crate::sst::parquet::reader::ReaderMetrics; /// Parallel scan channel size for flat format. @@ -949,6 +950,17 @@ impl ScanInput { } } + fn pre_filter_mode(&self) -> PreFilterMode { + if self.append_mode { + return PreFilterMode::All; + } + + match self.merge_mode { + MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete, + MergeMode::LastNonNull => PreFilterMode::SkipFields, + } + } + /// Prunes a file to scan and returns the builder to build readers. pub async fn prune_file( &self, @@ -956,6 +968,7 @@ impl ScanInput { reader_metrics: &mut ReaderMetrics, ) -> Result { let predicate = self.predicate_for_file(file); + let filter_mode = self.pre_filter_mode(); let res = self .access_layer .read_sst(file.clone()) @@ -968,6 +981,7 @@ impl ScanInput { .expected_metadata(Some(self.mapper.metadata().clone())) .flat_format(self.flat_format) .compaction(self.compaction) + .pre_filter_mode(filter_mode) .build_reader_input(reader_metrics) .await; let (mut file_range_ctx, selection) = match res { diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 6003ee6f87..d4e9324c87 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -282,7 +282,7 @@ impl FileRangeContext { PreFilterMode::SkipFields => true, PreFilterMode::SkipFieldsOnDelete => { // Check if this specific row group contains delete op - self.contains_delete(row_group_idx).unwrap_or(false) + self.contains_delete(row_group_idx).unwrap_or(true) } } } @@ -299,8 +299,8 @@ impl FileRangeContext { pub(crate) enum PreFilterMode { /// Filters all columns. All, - /// If doesn't contain delete op, filters all columns. - /// Otherwise, skips fields. + /// If the range doesn't contain delete op or doesn't have statistics, filters all columns. + /// Otherwise, skips filtering fields. SkipFieldsOnDelete, /// Always skip fields. SkipFields,