feat: add PreFilterMode to config whether to skip filtering fields

Adds the PreFilterMode to the RangeBase and sets it in
ParquetReaderBuilder

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-10-28 16:43:04 +08:00
parent 53546ad59e
commit 946bd1b813
3 changed files with 141 additions and 30 deletions

View File

@@ -24,7 +24,7 @@ use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::Result;
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::SimpleFilterContext;
@@ -73,6 +73,7 @@ impl BulkIterContext {
codec,
// we don't need to compat batch since all batch in memtable have the same schema.
compat_batch: None,
pre_filter_mode: PreFilterMode::All,
},
predicate,
})

View File

@@ -26,6 +26,7 @@ use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use parquet::arrow::arrow_reader::RowSelection;
use parquet::file::metadata::ParquetMetaData;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::{ColumnId, TimeSeriesRowSelector};
@@ -44,6 +45,33 @@ use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
};
/// Checks if a row group contains delete operations by examining the min value of op_type column.
///
/// Returns `Ok(true)` if the row group contains delete operations, `Ok(false)` if it doesn't,
/// or an error if the statistics are not present or cannot be decoded.
pub(crate) fn row_group_contains_delete(
parquet_meta: &ParquetMetaData,
row_group_index: usize,
file_path: &str,
) -> Result<bool> {
let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
// safety: The last column of SST must be op_type
let column_metadata = &row_group_metadata.columns().last().unwrap();
let stats = column_metadata
.statistics()
.context(StatsNotPresentSnafu { file_path })?;
stats
.min_bytes_opt()
.context(StatsNotPresentSnafu { file_path })?
.try_into()
.map(i32::from_le_bytes)
.map(|min_op_type| min_op_type == OpType::Delete as i32)
.ok()
.context(DecodeStatsSnafu { file_path })
}
/// A range of a parquet SST. Now it is a row group.
/// We can read different file ranges in parallel.
#[derive(Clone)]
@@ -178,6 +206,7 @@ impl FileRangeContext {
filters: Vec<SimpleFilterContext>,
read_format: ReadFormat,
codec: Arc<dyn PrimaryKeyCodec>,
pre_filter_mode: PreFilterMode,
) -> Self {
Self {
reader_builder,
@@ -186,6 +215,7 @@ impl FileRangeContext {
read_format,
codec,
compat_batch: None,
pre_filter_mode,
},
}
}
@@ -234,28 +264,22 @@ impl FileRangeContext {
//// Decodes parquet metadata and finds if row group contains delete op.
pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
let metadata = self.reader_builder.parquet_metadata();
let row_group_metadata = &metadata.row_groups()[row_group_index];
// safety: The last column of SST must be op_type
let column_metadata = &row_group_metadata.columns().last().unwrap();
let stats = column_metadata.statistics().context(StatsNotPresentSnafu {
file_path: self.reader_builder.file_path(),
})?;
stats
.min_bytes_opt()
.context(StatsNotPresentSnafu {
file_path: self.reader_builder.file_path(),
})?
.try_into()
.map(i32::from_le_bytes)
.map(|min_op_type| min_op_type == OpType::Delete as i32)
.ok()
.context(DecodeStatsSnafu {
file_path: self.reader_builder.file_path(),
})
row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
}
}
/// Mode to pre-filter columns in a range.
#[derive(Clone, Copy)]
pub(crate) enum PreFilterMode {
/// Filters all columns.
All,
/// If doesn't contain delete op, filters all columns.
/// Otherwise, skips fields.
SkipFieldsOnDelete,
/// Always skip fields.
SkipFields,
}
/// Common fields for a range to read and filter batches.
pub(crate) struct RangeBase {
/// Filters pushed down.
@@ -266,6 +290,8 @@ pub(crate) struct RangeBase {
pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
/// Optional helper to compat batches.
pub(crate) compat_batch: Option<CompatBatch>,
/// Mode to pre-filter columns.
pub(crate) pre_filter_mode: PreFilterMode,
}
impl RangeBase {

View File

@@ -55,7 +55,9 @@ use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
};
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
@@ -117,6 +119,8 @@ pub struct ParquetReaderBuilder {
flat_format: bool,
/// Whether this reader is for compaction.
compaction: bool,
/// Mode to pre-filter columns.
pre_filter_mode: PreFilterMode,
}
impl ParquetReaderBuilder {
@@ -141,6 +145,7 @@ impl ParquetReaderBuilder {
expected_metadata: None,
flat_format: false,
compaction: false,
pre_filter_mode: PreFilterMode::All,
}
}
@@ -218,6 +223,13 @@ impl ParquetReaderBuilder {
self
}
/// Sets the pre-filter mode.
#[must_use]
pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
self.pre_filter_mode = pre_filter_mode;
self
}
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -321,7 +333,13 @@ impl ParquetReaderBuilder {
let codec = build_primary_key_codec(read_format.metadata());
let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
let context = FileRangeContext::new(
reader_builder,
filters,
read_format,
codec,
self.pre_filter_mode,
);
metrics.build_cost += start.elapsed();
@@ -407,7 +425,16 @@ impl ParquetReaderBuilder {
let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
// Compute skip_fields once for all pruning operations
let skip_fields = self.compute_skip_fields(parquet_meta);
self.prune_row_groups_by_minmax(
read_format,
parquet_meta,
&mut output,
metrics,
skip_fields,
);
if output.is_empty() {
return output;
}
@@ -418,6 +445,7 @@ impl ParquetReaderBuilder {
num_row_groups,
&mut output,
metrics,
skip_fields,
)
.await;
if output.is_empty() {
@@ -429,14 +457,21 @@ impl ParquetReaderBuilder {
num_row_groups,
&mut output,
metrics,
skip_fields,
)
.await;
if output.is_empty() {
return output;
}
self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
.await;
self.prune_row_groups_by_bloom_filter(
row_group_size,
parquet_meta,
&mut output,
metrics,
skip_fields,
)
.await;
if output.is_empty() {
return output;
}
@@ -447,6 +482,7 @@ impl ParquetReaderBuilder {
parquet_meta,
&mut output,
metrics,
skip_fields,
)
.await;
}
@@ -460,13 +496,20 @@ impl ParquetReaderBuilder {
num_row_groups: usize,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
skip_fields: bool,
) -> bool {
if !self.file_handle.meta_ref().fulltext_index_available() {
return false;
}
let mut pruned = false;
for index_applier in self.fulltext_index_appliers.iter().flatten() {
// If skip_fields is true, only apply the first applier (for tags).
let appliers = if skip_fields {
&self.fulltext_index_appliers[..1]
} else {
&self.fulltext_index_appliers[..]
};
for index_applier in appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
@@ -521,13 +564,20 @@ impl ParquetReaderBuilder {
num_row_groups: usize,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
skip_fields: bool,
) -> bool {
if !self.file_handle.meta_ref().inverted_index_available() {
return false;
}
let mut pruned = false;
for index_applier in self.inverted_index_appliers.iter().flatten() {
// If skip_fields is true, only apply the first applier (for tags).
let appliers = if skip_fields {
&self.inverted_index_appliers[..1]
} else {
&self.inverted_index_appliers[..]
};
for index_applier in appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
@@ -578,13 +628,20 @@ impl ParquetReaderBuilder {
parquet_meta: &ParquetMetaData,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
skip_fields: bool,
) -> bool {
if !self.file_handle.meta_ref().bloom_filter_index_available() {
return false;
}
let mut pruned = false;
for index_applier in self.bloom_filter_index_appliers.iter().flatten() {
// If skip_fields is true, only apply the first applier (for tags).
let appliers = if skip_fields {
&self.bloom_filter_index_appliers[..1]
} else {
&self.bloom_filter_index_appliers[..]
};
for index_applier in appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
@@ -649,13 +706,20 @@ impl ParquetReaderBuilder {
parquet_meta: &ParquetMetaData,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
skip_fields: bool,
) -> bool {
if !self.file_handle.meta_ref().fulltext_index_available() {
return false;
}
let mut pruned = false;
for index_applier in self.fulltext_index_appliers.iter().flatten() {
// If skip_fields is true, only apply the first applier (for tags).
let appliers = if skip_fields {
&self.fulltext_index_appliers[..1]
} else {
&self.fulltext_index_appliers[..]
};
for index_applier in appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
let cached = self
@@ -715,6 +779,25 @@ impl ParquetReaderBuilder {
pruned
}
/// Computes whether to skip field columns when building statistics based on PreFilterMode.
fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
match self.pre_filter_mode {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => {
// Check if any row group contains delete op
let file_path = self.file_handle.file_path(&self.file_dir, self.path_type);
(0..parquet_meta.num_row_groups()).any(|rg_idx| {
row_group_contains_delete(parquet_meta, rg_idx, &file_path)
.inspect_err(|e| {
warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
})
.unwrap_or(false)
})
}
}
}
/// Prunes row groups by min-max index.
fn prune_row_groups_by_minmax(
&self,
@@ -722,6 +805,7 @@ impl ParquetReaderBuilder {
parquet_meta: &ParquetMetaData,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
skip_fields: bool,
) -> bool {
let Some(predicate) = &self.predicate else {
return false;
@@ -735,7 +819,7 @@ impl ParquetReaderBuilder {
row_groups,
read_format,
self.expected_metadata.clone(),
false,
skip_fields,
);
let prune_schema = self
.expected_metadata