From 946bd1b813e2359de82820ecfe3502bb706e20c9 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 28 Oct 2025 16:43:04 +0800 Subject: [PATCH] feat: add PreFilterMode to config whether to skip filtering fields Adds the PreFilterMode to the RangeBase and sets it in ParquetReaderBuilder Signed-off-by: evenyag --- src/mito2/src/memtable/bulk/context.rs | 3 +- src/mito2/src/sst/parquet/file_range.rs | 64 ++++++++++----- src/mito2/src/sst/parquet/reader.rs | 104 +++++++++++++++++++++--- 3 files changed, 141 insertions(+), 30 deletions(-) diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index d3c5912115..e4f8c47672 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -24,7 +24,7 @@ use store_api::storage::ColumnId; use table::predicate::Predicate; use crate::error::Result; -use crate::sst::parquet::file_range::RangeBase; +use crate::sst::parquet::file_range::{PreFilterMode, RangeBase}; use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::SimpleFilterContext; @@ -73,6 +73,7 @@ impl BulkIterContext { codec, // we don't need to compat batch since all batch in memtable have the same schema. compat_batch: None, + pre_filter_mode: PreFilterMode::All, }, predicate, }) diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 268391135b..97697517b6 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -26,6 +26,7 @@ use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; use parquet::arrow::arrow_reader::RowSelection; +use parquet::file::metadata::ParquetMetaData; use snafu::{OptionExt, ResultExt}; use store_api::codec::PrimaryKeyEncoding; use store_api::storage::{ColumnId, TimeSeriesRowSelector}; @@ -44,6 +45,33 @@ use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{ FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, }; + +/// Checks if a row group contains delete operations by examining the min value of op_type column. +/// +/// Returns `Ok(true)` if the row group contains delete operations, `Ok(false)` if it doesn't, +/// or an error if the statistics are not present or cannot be decoded. +pub(crate) fn row_group_contains_delete( + parquet_meta: &ParquetMetaData, + row_group_index: usize, + file_path: &str, +) -> Result { + let row_group_metadata = &parquet_meta.row_groups()[row_group_index]; + + // safety: The last column of SST must be op_type + let column_metadata = &row_group_metadata.columns().last().unwrap(); + let stats = column_metadata + .statistics() + .context(StatsNotPresentSnafu { file_path })?; + stats + .min_bytes_opt() + .context(StatsNotPresentSnafu { file_path })? + .try_into() + .map(i32::from_le_bytes) + .map(|min_op_type| min_op_type == OpType::Delete as i32) + .ok() + .context(DecodeStatsSnafu { file_path }) +} + /// A range of a parquet SST. Now it is a row group. /// We can read different file ranges in parallel. #[derive(Clone)] @@ -178,6 +206,7 @@ impl FileRangeContext { filters: Vec, read_format: ReadFormat, codec: Arc, + pre_filter_mode: PreFilterMode, ) -> Self { Self { reader_builder, @@ -186,6 +215,7 @@ impl FileRangeContext { read_format, codec, compat_batch: None, + pre_filter_mode, }, } } @@ -234,28 +264,22 @@ impl FileRangeContext { //// Decodes parquet metadata and finds if row group contains delete op. pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result { let metadata = self.reader_builder.parquet_metadata(); - let row_group_metadata = &metadata.row_groups()[row_group_index]; - - // safety: The last column of SST must be op_type - let column_metadata = &row_group_metadata.columns().last().unwrap(); - let stats = column_metadata.statistics().context(StatsNotPresentSnafu { - file_path: self.reader_builder.file_path(), - })?; - stats - .min_bytes_opt() - .context(StatsNotPresentSnafu { - file_path: self.reader_builder.file_path(), - })? - .try_into() - .map(i32::from_le_bytes) - .map(|min_op_type| min_op_type == OpType::Delete as i32) - .ok() - .context(DecodeStatsSnafu { - file_path: self.reader_builder.file_path(), - }) + row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path()) } } +/// Mode to pre-filter columns in a range. +#[derive(Clone, Copy)] +pub(crate) enum PreFilterMode { + /// Filters all columns. + All, + /// If doesn't contain delete op, filters all columns. + /// Otherwise, skips fields. + SkipFieldsOnDelete, + /// Always skip fields. + SkipFields, +} + /// Common fields for a range to read and filter batches. pub(crate) struct RangeBase { /// Filters pushed down. @@ -266,6 +290,8 @@ pub(crate) struct RangeBase { pub(crate) codec: Arc, /// Optional helper to compat batches. pub(crate) compat_batch: Option, + /// Mode to pre-filter columns. + pub(crate) pre_filter_mode: PreFilterMode, } impl RangeBase { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index c4364c08bf..3f5eb9b687 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -55,7 +55,9 @@ use crate::sst::file::FileHandle; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; -use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef}; +use crate::sst::parquet::file_range::{ + FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete, +}; use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; @@ -117,6 +119,8 @@ pub struct ParquetReaderBuilder { flat_format: bool, /// Whether this reader is for compaction. compaction: bool, + /// Mode to pre-filter columns. + pre_filter_mode: PreFilterMode, } impl ParquetReaderBuilder { @@ -141,6 +145,7 @@ impl ParquetReaderBuilder { expected_metadata: None, flat_format: false, compaction: false, + pre_filter_mode: PreFilterMode::All, } } @@ -218,6 +223,13 @@ impl ParquetReaderBuilder { self } + /// Sets the pre-filter mode. + #[must_use] + pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self { + self.pre_filter_mode = pre_filter_mode; + self + } + /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. @@ -321,7 +333,13 @@ impl ParquetReaderBuilder { let codec = build_primary_key_codec(read_format.metadata()); - let context = FileRangeContext::new(reader_builder, filters, read_format, codec); + let context = FileRangeContext::new( + reader_builder, + filters, + read_format, + codec, + self.pre_filter_mode, + ); metrics.build_cost += start.elapsed(); @@ -407,7 +425,16 @@ impl ParquetReaderBuilder { let mut output = RowGroupSelection::new(row_group_size, num_rows as _); - self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics); + // Compute skip_fields once for all pruning operations + let skip_fields = self.compute_skip_fields(parquet_meta); + + self.prune_row_groups_by_minmax( + read_format, + parquet_meta, + &mut output, + metrics, + skip_fields, + ); if output.is_empty() { return output; } @@ -418,6 +445,7 @@ impl ParquetReaderBuilder { num_row_groups, &mut output, metrics, + skip_fields, ) .await; if output.is_empty() { @@ -429,14 +457,21 @@ impl ParquetReaderBuilder { num_row_groups, &mut output, metrics, + skip_fields, ) .await; if output.is_empty() { return output; } - self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics) - .await; + self.prune_row_groups_by_bloom_filter( + row_group_size, + parquet_meta, + &mut output, + metrics, + skip_fields, + ) + .await; if output.is_empty() { return output; } @@ -447,6 +482,7 @@ impl ParquetReaderBuilder { parquet_meta, &mut output, metrics, + skip_fields, ) .await; } @@ -460,13 +496,20 @@ impl ParquetReaderBuilder { num_row_groups: usize, output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, + skip_fields: bool, ) -> bool { if !self.file_handle.meta_ref().fulltext_index_available() { return false; } let mut pruned = false; - for index_applier in self.fulltext_index_appliers.iter().flatten() { + // If skip_fields is true, only apply the first applier (for tags). + let appliers = if skip_fields { + &self.fulltext_index_appliers[..1] + } else { + &self.fulltext_index_appliers[..] + }; + for index_applier in appliers.iter().flatten() { let predicate_key = index_applier.predicate_key(); // Fast path: return early if the result is in the cache. let cached = self @@ -521,13 +564,20 @@ impl ParquetReaderBuilder { num_row_groups: usize, output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, + skip_fields: bool, ) -> bool { if !self.file_handle.meta_ref().inverted_index_available() { return false; } let mut pruned = false; - for index_applier in self.inverted_index_appliers.iter().flatten() { + // If skip_fields is true, only apply the first applier (for tags). + let appliers = if skip_fields { + &self.inverted_index_appliers[..1] + } else { + &self.inverted_index_appliers[..] + }; + for index_applier in appliers.iter().flatten() { let predicate_key = index_applier.predicate_key(); // Fast path: return early if the result is in the cache. let cached = self @@ -578,13 +628,20 @@ impl ParquetReaderBuilder { parquet_meta: &ParquetMetaData, output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, + skip_fields: bool, ) -> bool { if !self.file_handle.meta_ref().bloom_filter_index_available() { return false; } let mut pruned = false; - for index_applier in self.bloom_filter_index_appliers.iter().flatten() { + // If skip_fields is true, only apply the first applier (for tags). + let appliers = if skip_fields { + &self.bloom_filter_index_appliers[..1] + } else { + &self.bloom_filter_index_appliers[..] + }; + for index_applier in appliers.iter().flatten() { let predicate_key = index_applier.predicate_key(); // Fast path: return early if the result is in the cache. let cached = self @@ -649,13 +706,20 @@ impl ParquetReaderBuilder { parquet_meta: &ParquetMetaData, output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, + skip_fields: bool, ) -> bool { if !self.file_handle.meta_ref().fulltext_index_available() { return false; } let mut pruned = false; - for index_applier in self.fulltext_index_appliers.iter().flatten() { + // If skip_fields is true, only apply the first applier (for tags). + let appliers = if skip_fields { + &self.fulltext_index_appliers[..1] + } else { + &self.fulltext_index_appliers[..] + }; + for index_applier in appliers.iter().flatten() { let predicate_key = index_applier.predicate_key(); // Fast path: return early if the result is in the cache. let cached = self @@ -715,6 +779,25 @@ impl ParquetReaderBuilder { pruned } + /// Computes whether to skip field columns when building statistics based on PreFilterMode. + fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool { + match self.pre_filter_mode { + PreFilterMode::All => false, + PreFilterMode::SkipFields => true, + PreFilterMode::SkipFieldsOnDelete => { + // Check if any row group contains delete op + let file_path = self.file_handle.file_path(&self.file_dir, self.path_type); + (0..parquet_meta.num_row_groups()).any(|rg_idx| { + row_group_contains_delete(parquet_meta, rg_idx, &file_path) + .inspect_err(|e| { + warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields"); + }) + .unwrap_or(false) + }) + } + } + } + /// Prunes row groups by min-max index. fn prune_row_groups_by_minmax( &self, @@ -722,6 +805,7 @@ impl ParquetReaderBuilder { parquet_meta: &ParquetMetaData, output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, + skip_fields: bool, ) -> bool { let Some(predicate) = &self.predicate else { return false; @@ -735,7 +819,7 @@ impl ParquetReaderBuilder { row_groups, read_format, self.expected_metadata.clone(), - false, + skip_fields, ); let prune_schema = self .expected_metadata