From 6a84393e089c4cc9a8df3bdc569411ece9ea1b35 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 29 Apr 2026 11:35:22 +0800 Subject: [PATCH] feat: support prefiltering any columns in flat format (#7972) * refactor: prepare parquet prefilter for multi-column execution Signed-off-by: evenyag * refactor: restore parquet physical filter contexts Signed-off-by: evenyag * feat: add generalized parquet prefilter projection Signed-off-by: evenyag * refactor: avoid re-evaluating parquet prefiltered predicates Signed-off-by: evenyag * test: cover generalized parquet prefilter behavior Signed-off-by: evenyag * refactor: remove variant Signed-off-by: evenyag * feat: only prefilter physical exprs Signed-off-by: evenyag * refactor: remove execute_general_prefilter Signed-off-by: evenyag * feat: only prefilter cheap exprs Signed-off-by: evenyag * refactor: context usage Signed-off-by: evenyag * refactor: categorize filters Signed-off-by: evenyag * refactor: prefilter plan for bulk memtable Signed-off-by: evenyag * refactor: move parquet filter plan builders into prefilter Signed-off-by: evenyag * chore: comment Signed-off-by: evenyag * test: simplify tests Signed-off-by: evenyag * feat: enable prefilter by threshold Signed-off-by: evenyag * fix: correct pk filter grouping Signed-off-by: evenyag * chore: update comment Signed-off-by: evenyag * chore: remove unused code Signed-off-by: evenyag * chore: fix warning Signed-off-by: evenyag * fix: handle nulls in physical filter result Signed-off-by: evenyag * chore: fmt import Signed-off-by: evenyag * docs: update comments Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/memtable/bulk/context.rs | 45 +- src/mito2/src/memtable/bulk/part_reader.rs | 2 - src/mito2/src/sst/parquet/file_range.rs | 67 +- src/mito2/src/sst/parquet/prefilter.rs | 930 +++++++++++++++++---- src/mito2/src/sst/parquet/reader.rs | 292 +++++-- src/mito2/src/sst/parquet/row_selection.rs | 70 -- src/table/src/predicate.rs | 22 +- 7 files changed, 1037 insertions(+), 391 deletions(-) diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 9fa3c641a7..9290fa17b0 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -27,8 +27,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::sst::parquet::file_range::{PreFilterMode, RangeBase}; use crate::sst::parquet::flat_format::FlatReadFormat; -use crate::sst::parquet::prefilter::CachedPrimaryKeyFilter; -use crate::sst::parquet::reader::SimpleFilterContext; +use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, build_bulk_filter_plan}; use crate::sst::parquet::stats::RowGroupPruningStats; pub(crate) type BulkIterContextRef = Arc; @@ -66,17 +65,6 @@ impl BulkIterContext { ) -> Result { let codec = build_primary_key_codec(®ion_metadata); - let simple_filters: Vec = predicate - .as_ref() - .iter() - .flat_map(|predicate| { - predicate - .exprs() - .iter() - .filter_map(|expr| SimpleFilterContext::new_opt(®ion_metadata, None, expr)) - }) - .collect(); - let read_format = if let Some(column_ids) = projection { FlatReadFormat::new( region_metadata.clone(), @@ -103,12 +91,11 @@ impl BulkIterContext { .map(|pred| pred.dyn_filters().as_ref().clone()) .unwrap_or_default(); - // Pre-extract PK filters if applicable. - let pk_filters = Self::extract_pk_filters(&read_format, &simple_filters); + let filter_plan = build_bulk_filter_plan(&read_format, predicate.as_ref()); Ok(Self { base: RangeBase { - filters: simple_filters, + filters: filter_plan.remaining_simple_filters, dyn_filters, read_format, prune_schema: region_metadata.schema.clone(), @@ -121,7 +108,7 @@ impl BulkIterContext { partition_filter: None, }, predicate, - pk_filters, + pk_filters: filter_plan.pk_filters, }) } @@ -153,30 +140,6 @@ impl BulkIterContext { } } - /// Extracts PK filters if flat format with dictionary-encoded PKs is used. - fn extract_pk_filters( - read_format: &FlatReadFormat, - filters: &[SimpleFilterContext], - ) -> Option>> { - if read_format.batch_has_raw_pk_columns() { - return None; - } - let metadata = read_format.metadata(); - if metadata.primary_key.is_empty() { - return None; - } - - let pk_filters: Vec<_> = filters - .iter() - .filter_map(|f| f.primary_key_prefilter()) - .collect(); - if pk_filters.is_empty() { - return None; - } - - Some(Arc::new(pk_filters)) - } - /// Builds a fresh PK filter for a new iterator. Returns `None` if PK /// prefiltering is not applicable. pub(crate) fn build_pk_filter(&self) -> Option { diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index ac069f20be..11cb5873d5 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -380,7 +380,6 @@ fn apply_combined_filters( metrics: &mut MemScanMetricsData, ) -> error::Result> { // Apply PK prefilter on raw batch before convert_batch to reduce conversion overhead. - let has_pk_prefilter = pk_filter.is_some(); let record_batch = if let Some(pk_filter) = pk_filter { let rows_before = record_batch.num_rows(); let prefilter_start = Instant::now(); @@ -413,7 +412,6 @@ fn apply_combined_filters( let predicate_mask = context.base.compute_filter_mask_flat( &record_batch, skip_fields, - has_pk_prefilter, &mut tag_decode_state, )?; // If predicate filters out the entire batch, return None early diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 8034310eee..0415cdbfb3 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -172,7 +172,6 @@ impl FileRange { self.row_group_idx, self.row_selection.clone(), fetch_metrics, - skip_fields, )) .await?; @@ -199,7 +198,7 @@ impl FileRange { FlatRowGroupReader::new(self.context.clone(), parquet_reader); // Flat PK prefilter makes the input stream predicate-dependent, so cached // selector results are not reusable across queries with different filters. - let cache_strategy = if self.context.reader_builder.has_flat_primary_key_prefilter() { + let cache_strategy = if self.context.reader_builder.has_predicate_prefilter() { CacheStrategy::Disabled } else { self.context.reader_builder.cache_strategy().clone() @@ -297,16 +296,13 @@ impl FileRangeContext { /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. /// If a partition expr filter is configured, it is also applied. + /// Physical filter exprs are not evaluated here; they are only applied during prefiltering. pub(crate) fn precise_filter_flat( &self, input: RecordBatch, skip_fields: bool, ) -> Result> { - self.base.precise_filter_flat( - input, - skip_fields, - self.reader_builder.has_flat_primary_key_prefilter(), - ) + self.base.precise_filter_flat(input, skip_fields) } pub(crate) fn pre_filter_mode(&self) -> PreFilterMode { @@ -325,11 +321,8 @@ impl FileRangeContext { row_group_idx: usize, row_selection: Option, fetch_metrics: Option<&'a ParquetFetchMetrics>, - skip_fields: bool, ) -> RowGroupBuildContext<'a> { RowGroupBuildContext { - filters: &self.base.filters, - skip_fields, row_group_idx, row_selection, fetch_metrics, @@ -344,7 +337,7 @@ impl FileRangeContext { } /// Mode to pre-filter columns in a range. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PreFilterMode { /// Filters all columns. All, @@ -415,15 +408,9 @@ impl RangeBase { &self, input: RecordBatch, skip_fields: bool, - skip_prefiltered_pk_filters: bool, ) -> Result> { let mut tag_decode_state = TagDecodeState::new(); - let mask = self.compute_filter_mask_flat( - &input, - skip_fields, - skip_prefiltered_pk_filters, - &mut tag_decode_state, - )?; + let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?; // If mask is None, the entire batch is filtered out let Some(mut mask) = mask else { @@ -458,6 +445,7 @@ impl RangeBase { /// Computes the filter mask for the input RecordBatch based on pushed down predicates. /// If a partition expr filter is configured, it is applied later in `precise_filter_flat` but **NOT** in this function. + /// Physical filter exprs are excluded here and only apply during prefiltering. /// /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask. /// @@ -468,7 +456,6 @@ impl RangeBase { &self, input: &RecordBatch, skip_fields: bool, - skip_prefiltered_pk_filters: bool, tag_decode_state: &mut TagDecodeState, ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); @@ -490,12 +477,6 @@ impl RangeBase { continue; } - // Flat parquet PK prefiltering already applied these tag predicates while refining - // row selection, so skip them here to avoid decoding/evaluating the same condition twice. - if skip_prefiltered_pk_filters && filter_ctx.usable_primary_key_filter() { - continue; - } - // Get the column directly by its projected index. // If the column is missing and it's not a tag/time column, this filter is skipped. // Assumes the projection indices align with the input batch schema. @@ -703,7 +684,7 @@ mod tests { } #[test] - fn test_compute_filter_mask_flat_skips_prefiltered_pk_filters() { + fn test_compute_filter_mask_flat_applies_remaining_simple_filters() { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); let filters = vec![ SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(), @@ -712,16 +693,38 @@ mod tests { let base = new_test_range_base(filters); let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1); - let mask_without_skip = base - .compute_filter_mask_flat(&batch, false, false, &mut TagDecodeState::new()) + let mask = base + .compute_filter_mask_flat(&batch, false, &mut TagDecodeState::new()) .unwrap() .unwrap(); - assert_eq!(mask_without_skip.count_set_bits(), 0); + assert_eq!(mask.count_set_bits(), 0); + } - let mask_with_skip = base - .compute_filter_mask_flat(&batch, false, true, &mut TagDecodeState::new()) + #[test] + fn test_compute_filter_mask_flat_does_not_postfilter_physical_filters() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let read_format = FlatReadFormat::new( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "test", + true, + ) + .unwrap(); + let physical_filter = crate::sst::parquet::reader::PhysicalFilterContext::new_opt( + &metadata, + None, + &read_format, + &col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false), + ); + assert!(physical_filter.is_some()); + let base = new_test_range_base(vec![]); + let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1); + + let mask = base + .compute_filter_mask_flat(&batch, false, &mut TagDecodeState::new()) .unwrap() .unwrap(); - assert_eq!(mask_with_skip.count_set_bits(), 2); + assert_eq!(mask.count_set_bits(), 4); } } diff --git a/src/mito2/src/sst/parquet/prefilter.rs b/src/mito2/src/sst/parquet/prefilter.rs index c722e442cb..7c6ce9e156 100644 --- a/src/mito2/src/sst/parquet/prefilter.rs +++ b/src/mito2/src/sst/parquet/prefilter.rs @@ -18,12 +18,14 @@ //! (the prefilter phase), applying filters to compute a refined row selection, //! then reading the remaining columns with the refined selection. -use std::ops::Range; +use std::collections::HashSet; +use std::ops::{BitAnd, Range}; use std::sync::Arc; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; -use datatypes::arrow::array::{BinaryArray, BooleanArray, BooleanBufferBuilder}; +use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder}; +use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use futures::StreamExt; use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter}; @@ -32,12 +34,20 @@ use parquet::arrow::arrow_reader::RowSelection; use parquet::schema::types::SchemaDescriptor; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; +use table::predicate::Predicate; -use crate::error::{ComputeArrowSnafu, DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu}; -use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index}; +use crate::error::{ + ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, + ReadParquetSnafu, RecordBatchSnafu, Result, UnexpectedSnafu, +}; +use crate::sst::parquet::file_range::PreFilterMode; +use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::parquet::format::PrimaryKeyArray; -use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder}; -use crate::sst::parquet::row_selection::row_selection_from_row_ranges_exact; +use crate::sst::parquet::reader::{ + MaybeFilter, PhysicalFilterContext, RowGroupBuildContext, RowGroupReaderBuilder, + SimpleFilterContext, +}; pub(crate) fn matching_row_ranges_by_primary_key( input: &RecordBatch, @@ -142,48 +152,6 @@ pub(crate) fn prefilter_flat_batch_by_primary_key( } } -/// Returns whether a filter can be applied by parquet primary-key prefiltering. -/// -/// Unlike `PartitionTreeMemtable`, parquet prefilter always supports predicates -/// on the partition column. -pub(crate) fn is_usable_primary_key_filter( - sst_metadata: &RegionMetadataRef, - expected_metadata: Option<&RegionMetadata>, - filter: &SimpleFilterEvaluator, -) -> bool { - let sst_column = match expected_metadata { - Some(expected_metadata) => { - let Some(expected_column) = expected_metadata.column_by_name(filter.column_name()) - else { - return false; - }; - let Some(sst_column) = sst_metadata.column_by_id(expected_column.column_id) else { - return false; - }; - - if sst_column.column_schema.name != expected_column.column_schema.name - || sst_column.semantic_type != expected_column.semantic_type - || sst_column.column_schema.data_type != expected_column.column_schema.data_type - { - return false; - } - - sst_column - } - None => { - let Some(sst_column) = sst_metadata.column_by_name(filter.column_name()) else { - return false; - }; - sst_column - } - }; - - sst_column.semantic_type == SemanticType::Tag - && sst_metadata - .primary_key_index(sst_column.column_id) - .is_some() -} - pub(crate) struct CachedPrimaryKeyFilter { inner: Box, last_primary_key: Vec, @@ -216,18 +184,233 @@ impl PrimaryKeyFilter for CachedPrimaryKeyFilter { } } -/// Context for prefiltering a row group. +/// How the bulk-memtable read should apply each predicate. /// -/// Currently supports primary key (PK) filtering only. -/// Will be extended with simple column filters and physical filters in the future. +/// Unlike the parquet reader, the bulk path has no prefilter pass; predicates +/// either run row-wise inside the iterator or are pushed down to encoded-PK +/// matching when the batch still carries the primary-key column. +pub(crate) struct BulkFilterPlan { + /// Simple filters the iterator still has to evaluate row-wise on each batch. + pub(crate) remaining_simple_filters: Vec, + /// Tag predicates lowered to encoded-PK filters. `None` when the batch + /// already exposes raw tag columns or there are no tag predicates. + pub(crate) pk_filters: Option>>, +} + +/// How the parquet reader should apply each predicate. +/// +/// The reader runs in two phases. Predicates routed into `prefilter_builder` +/// execute on a reduced column set first to compute a refined row selection; +/// `remaining_simple_filters` execute alongside the full projection on the +/// normal read path. The contract for what is precise vs best-effort is +/// documented on [`build_reader_filter_plan`]. +pub(crate) struct ReaderFilterPlan { + /// Simple filters that must run on the normal read path: predicates with + /// `Matched` / `Pruned` outcomes (which carry expected-metadata + /// compatibility decisions later phases rely on), and predicates whose + /// column cannot be read directly during the prefilter pass. + pub(crate) remaining_simple_filters: Vec, + /// Pre-built state for the prefilter pass, or `None` when prefiltering is + /// not worthwhile (no prefilter columns selected, or the prefilter + /// projection would cover nearly the full read). + pub(crate) prefilter_builder: Option, +} + +pub(crate) fn build_bulk_filter_plan( + read_format: &FlatReadFormat, + predicate: Option<&Predicate>, +) -> BulkFilterPlan { + let metadata = read_format.metadata(); + // Bulk memtable only needs simple binary filters here. Any filter that + // cannot be reduced to a SimpleFilterContext stays out of this fast path. + let simple_filters: Vec = predicate + .into_iter() + .flat_map(|predicate| { + predicate + .exprs() + .iter() + .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr)) + }) + .collect(); + + // PK prefilter only works when flat batches still carry the encoded PK + // column. If tags have already been expanded to raw columns, the iterator + // can apply those filters directly and there is nothing to extract here. + if read_format.batch_has_raw_pk_columns() || metadata.primary_key.is_empty() { + return BulkFilterPlan { + remaining_simple_filters: simple_filters, + pk_filters: None, + }; + } + + let mut remaining_simple_filters = Vec::new(); + let mut pk_filters = Vec::new(); + + for filter_ctx in simple_filters { + // Split tag predicates that can be evaluated against the encoded PK + // from filters that still need normal row-wise evaluation later. + let pk_filter = filter_ctx.filter().as_filter().and_then(|filter| { + (filter_ctx.semantic_type() == SemanticType::Tag).then(|| filter.clone()) + }); + + if let Some(pk_filter) = pk_filter { + pk_filters.push(pk_filter); + } else { + remaining_simple_filters.push(filter_ctx); + } + } + + BulkFilterPlan { + remaining_simple_filters, + pk_filters: (!pk_filters.is_empty()).then_some(Arc::new(pk_filters)), + } +} + +/// Splits a query [`Predicate`] into a [`ReaderFilterPlan`]: predicates that can run +/// during the prefilter pass (on a reduced projection, to compute a refined row +/// selection) versus predicates that must run on the normal read path (alongside the +/// full projection). +/// +/// The prefilter pass is *best-effort pruning*: a physical-filter predicate is silently +/// dropped when [`PhysicalFilterContext::new_opt`] returns `None` (column not in the +/// projected arrow schema). This is safe because the DataFusion `FilterExec` above the +/// reader always re-applies the original predicate, so the prefilter pass is purely a +/// pruning hint. +/// +/// Tag and timestamp predicates that lower to [`SimpleFilterEvaluator`] are an +/// exception — the engine enforces them precisely, so the prefilter pass is the only +/// place they execute. They are never silently dropped. +pub(crate) fn build_reader_filter_plan( + predicate: Option<&Predicate>, + expected_metadata: Option<&RegionMetadata>, + pre_filter_mode: PreFilterMode, + read_format: &FlatReadFormat, + parquet_schema: &SchemaDescriptor, + codec: &Arc, +) -> ReaderFilterPlan { + let Some(predicate) = predicate else { + return ReaderFilterPlan { + remaining_simple_filters: Vec::new(), + prefilter_builder: None, + }; + }; + + let metadata = read_format.metadata(); + let mut prefilter_simple_filters = Vec::new(); + let mut remaining_simple_filters = Vec::new(); + let mut prefilter_physical_filters = Vec::new(); + let mut primary_key_filters = Vec::new(); + let mut pk_filter_contexts = Vec::new(); + + // `SkipFields` keeps field predicates in the normal read path to avoid a + // second read of projected field columns, while tags/timestamp can still + // participate in prefiltering. + let field_prefilter_enabled = pre_filter_mode == PreFilterMode::All; + // When true, tag columns are encoded in the primary key column and are NOT + // stored as separate parquet columns. Tag predicates must go through PK + // decoding rather than direct column reads. + let need_pk_prefilter = !read_format.batch_has_raw_pk_columns(); + + // Whether a column can be read directly from parquet for prefiltering, + // based on its semantic type and the current mode/format. + let can_direct_prefilter = |semantic_type: SemanticType| -> bool { + match semantic_type { + SemanticType::Tag => !need_pk_prefilter, + SemanticType::Field => field_prefilter_enabled, + SemanticType::Timestamp => true, + } + }; + + for expr in predicate.exprs() { + // Prefer cheap simple filters first. They also preserve `Matched` / + // `Pruned` states for columns that only exist in expected metadata. + if let Some(filter_ctx) = SimpleFilterContext::new_opt(metadata, expected_metadata, expr) { + // `Matched` and `Pruned` come from expected-metadata compatibility + // and must stay in the main filter list so later phases keep that + // outcome. + let Some(filter) = filter_ctx.filter().as_filter() else { + remaining_simple_filters.push(filter_ctx); + continue; + }; + + // If the column is stored as a separate parquet column and is already projected in the main read, + // we can evaluate the simple filter directly during prefilter. + let direct_prefilter = can_direct_prefilter(filter_ctx.semantic_type()); + if direct_prefilter { + assert!( + read_format + .arrow_schema() + .column_with_name(filter.column_name()) + .is_some(), + "Column '{}' is not present in the arrow schema {:?}", + filter.column_name(), + read_format.arrow_schema(), + ); + prefilter_simple_filters.push(filter_ctx); + continue; + } + + // Otherwise try to filter through encoded-PK matching. + if need_pk_prefilter && filter_ctx.semantic_type() == SemanticType::Tag { + primary_key_filters.push(filter.clone()); + pk_filter_contexts.push(filter_ctx); + } else { + remaining_simple_filters.push(filter_ctx); + } + continue; + } + + // Best-effort physical-filter prefilter (see fn-level doc): `new_opt` + // returning `None` means the column is not in the projected arrow + // schema, and dropping the predicate is safe because the upper + // `FilterExec` re-applies it. + if let Some(filter) = + PhysicalFilterContext::new_opt(metadata, expected_metadata, read_format, expr) + && can_direct_prefilter(filter.semantic_type()) + { + prefilter_physical_filters.push(filter); + } + } + + let pk_filter_exprs = + (!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters)); + let prefilter_builder = PrefilterContextBuilder::new( + read_format, + codec, + pk_filter_exprs, + prefilter_simple_filters.clone(), + prefilter_physical_filters, + parquet_schema, + ); + + if prefilter_builder.is_some() { + ReaderFilterPlan { + remaining_simple_filters, + prefilter_builder, + } + } else { + // If prefilter setup is not worthwhile, keep the original simple + // filters on the normal path so behavior is unchanged. + remaining_simple_filters.extend(prefilter_simple_filters); + remaining_simple_filters.extend(pk_filter_contexts); + ReaderFilterPlan { + remaining_simple_filters, + prefilter_builder: None, + } + } +} + +/// Context for prefiltering a row group. pub(crate) struct PrefilterContext { - /// PK filter instance. - pk_filter: Box, - /// Projection mask for reading only the PK column. - pk_projection: ProjectionMask, - /// Index of the PK column within the prefilter projection batch. - /// This is 0 when we project only the PK column. - pk_column_index: usize, + /// Projection mask for reading prefilter columns. + projection: ProjectionMask, + /// Optional PK filter for legacy primary-key-format parquet. + pk_filter: Option>, + /// Simple filters that can be evaluated directly from the prefilter batch. + filters: Vec, + /// Physical filters that can be evaluated directly from the prefilter batch. + /// Physical expressions are only applied in the prefilter phase. + physical_filters: Vec, } /// Pre-built state for constructing [PrefilterContext] per row group. @@ -236,74 +419,103 @@ pub(crate) struct PrefilterContext { /// are computed once. A fresh [PrefilterContext] with its own mutable PK filter /// is created via [PrefilterContextBuilder::build()] for each row group. pub(crate) struct PrefilterContextBuilder { - pk_projection: ProjectionMask, - pk_column_index: usize, + projection: ProjectionMask, + pk_filters: Option>>, + filters: Vec, + physical_filters: Vec, codec: Arc, metadata: RegionMetadataRef, - pk_filters: Arc>, } impl PrefilterContextBuilder { /// Creates a builder if prefiltering is applicable. /// /// Returns `None` if: - /// - No primary key filters are available - /// - The read format doesn't use flat layout with dictionary-encoded PKs - /// - The primary key is empty + /// - The read format doesn't use flat layout + /// - No prefilter columns are selected + /// - Prefilter would read the full projection without any PK filter pub(crate) fn new( read_format: &FlatReadFormat, codec: &Arc, - primary_key_filters: Option<&Arc>>, + primary_key_filters: Option>>, + filters: Vec, + physical_filters: Vec, parquet_schema: &SchemaDescriptor, ) -> Option { - let pk_filters = primary_key_filters?; - if pk_filters.is_empty() { - return None; - } - let metadata = read_format.metadata(); - if metadata.primary_key.is_empty() { + let use_raw_tag_columns = read_format.batch_has_raw_pk_columns(); + let pk_filters = (!use_raw_tag_columns) + .then_some(primary_key_filters) + .flatten() + .filter(|filters| !filters.is_empty()); + + let mut prefilter_column_names = HashSet::new(); + for filter_ctx in &filters { + if let MaybeFilter::Filter(filter) = filter_ctx.filter() { + prefilter_column_names.insert(filter.column_name().to_string()); + } + } + + if pk_filters.is_some() { + prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string()); + } + + for filter_ctx in &physical_filters { + prefilter_column_names.insert(filter_ctx.column_name().to_string()); + } + + let (projection, prefilter_count) = compute_projection_mask( + &prefilter_column_names, + read_format.arrow_schema(), + parquet_schema, + ); + + if prefilter_count == 0 { return None; } - // Only perform PK prefiltering for primary-key-to-flat conversion path. - if read_format.batch_has_raw_pk_columns() { + let total_count = read_format.projection_indices().len(); + let remaining_count = total_count.saturating_sub(prefilter_count); + if pk_filters.is_none() && prefilter_count >= total_count { return None; } - // Compute PK-only projection mask. - let num_parquet_columns = parquet_schema.num_columns(); - let pk_index = primary_key_column_index(num_parquet_columns); - let pk_projection = ProjectionMask::roots(parquet_schema, [pk_index]); - - // The PK column is the only column in the projection, so its index is 0. - let pk_column_index = 0; + if pk_filters.is_none() + && !should_use_prefilter(prefilter_count, remaining_count, total_count) + { + return None; + } Some(Self { - pk_projection, - pk_column_index, + projection, + pk_filters, + filters, + physical_filters, codec: Arc::clone(codec), metadata: metadata.clone(), - pk_filters: Arc::clone(pk_filters), }) } /// Builds a [PrefilterContext] for a specific row group. pub(crate) fn build(&self) -> PrefilterContext { - // Parquet PK prefilter always supports the partition column. Only - // PartitionTreeMemtable skips it after partition pruning. - let pk_filter = - self.codec - .primary_key_filter(&self.metadata, Arc::clone(&self.pk_filters), false); - let pk_filter = Box::new(CachedPrimaryKeyFilter::new(pk_filter)); + let pk_filter = self.pk_filters.as_ref().map(|pk_filters| { + let pk_filter = + self.codec + .primary_key_filter(&self.metadata, Arc::clone(pk_filters), false); + Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box + }); PrefilterContext { + projection: self.projection.clone(), pk_filter, - pk_projection: self.pk_projection.clone(), - pk_column_index: self.pk_column_index, + filters: self.filters.clone(), + physical_filters: self.physical_filters.clone(), } } } +const PREFILTER_COLUMN_RATIO_THRESHOLD: f64 = 0.5; +const PREFILTER_MIN_REMAINING_COLUMNS: usize = 2; + /// Result of prefiltering a row group. pub(crate) struct PrefilterResult { /// Refined row selection after prefiltering. @@ -316,65 +528,88 @@ pub(crate) struct PrefilterResult { /// /// Reads only the prefilter columns (currently the PK dictionary column), /// applies filters, and returns a refined [RowSelection]. +fn compute_projection_mask( + column_names: &HashSet, + arrow_schema: &datatypes::arrow::datatypes::SchemaRef, + parquet_schema: &SchemaDescriptor, +) -> (ProjectionMask, usize) { + let mut projection_indices: Vec = column_names + .iter() + .filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index)) + .collect(); + projection_indices.sort_unstable(); + projection_indices.dedup(); + let count = projection_indices.len(); + ( + ProjectionMask::roots(parquet_schema, projection_indices.iter().copied()), + count, + ) +} + +fn should_use_prefilter( + prefilter_count: usize, + remaining_count: usize, + total_count: usize, +) -> bool { + if remaining_count == 0 { + return false; + } + + if remaining_count < PREFILTER_MIN_REMAINING_COLUMNS { + return false; + } + + let ratio = prefilter_count as f64 / total_count as f64; + ratio <= PREFILTER_COLUMN_RATIO_THRESHOLD +} + pub(crate) async fn execute_prefilter( prefilter_ctx: &mut PrefilterContext, reader_builder: &RowGroupReaderBuilder, build_ctx: &RowGroupBuildContext<'_>, ) -> Result { - // Reads PK column only. - let mut pk_stream = reader_builder + let mut stream = reader_builder .build_with_projection( build_ctx.row_group_idx, build_ctx.row_selection.clone(), - prefilter_ctx.pk_projection.clone(), + prefilter_ctx.projection.clone(), build_ctx.fetch_metrics, ) .await?; - // Applies PK filter to each batch and collect matching row ranges. - let mut matched_row_ranges: Vec> = Vec::new(); - let mut row_offset = 0; + let mut filter_arrays = Vec::new(); let mut rows_before_filter = 0usize; + let mut rows_selected = 0usize; - while let Some(batch_result) = pk_stream.next().await { + while let Some(batch_result) = stream.next().await { let batch = batch_result.context(ReadParquetSnafu { path: reader_builder.file_path(), })?; - let batch_num_rows = batch.num_rows(); - if batch_num_rows == 0 { + let num_rows = batch.num_rows(); + if num_rows == 0 { continue; } - rows_before_filter += batch_num_rows; + rows_before_filter += num_rows; - let ranges = matching_row_ranges_by_primary_key( + let batch_mask = match apply_filters_to_batch( &batch, - prefilter_ctx.pk_column_index, - prefilter_ctx.pk_filter.as_mut(), - )?; - matched_row_ranges.extend( - ranges - .into_iter() - .map(|range| (range.start + row_offset)..(range.end + row_offset)), - ); - row_offset += batch_num_rows; + &mut prefilter_ctx.pk_filter, + &prefilter_ctx.filters, + &prefilter_ctx.physical_filters, + reader_builder.file_path(), + )? { + Some(mask) => mask, + None => BooleanBuffer::new_unset(num_rows), + }; + rows_selected += batch_mask.count_set_bits(); + filter_arrays.push(BooleanArray::from(batch_mask)); } - // Converts matched ranges to RowSelection. - let rows_selected: usize = matched_row_ranges.iter().map(|r| r.end - r.start).sum(); let filtered_rows = rows_before_filter.saturating_sub(rows_selected); - - let refined_selection = if rows_selected == 0 { + let refined_selection = if filter_arrays.is_empty() || rows_selected == 0 { RowSelection::from(vec![]) } else { - // Build the prefilter selection relative to the yielded rows - // (not total_rows), since matched_row_ranges are offsets within - // the rows actually read from the stream. - let prefilter_selection = - row_selection_from_row_ranges_exact(matched_row_ranges.into_iter(), rows_before_filter); - - // Use and_then to apply prefilter selection within the context - // of the original selection, since prefilter offsets are relative - // to the original selection's selected rows. + let prefilter_selection = RowSelection::from_filters(&filter_arrays); match &build_ctx.row_selection { Some(original) => original.and_then(&prefilter_selection), None => prefilter_selection, @@ -387,6 +622,102 @@ pub(crate) async fn execute_prefilter( }) } +fn apply_filters_to_batch( + batch: &RecordBatch, + pk_filter: &mut Option>, + filters: &[SimpleFilterContext], + physical_filters: &[PhysicalFilterContext], + file_path: &str, +) -> Result> { + let mut mask = BooleanBuffer::new_set(batch.num_rows()); + + if let Some(pk_filter) = pk_filter.as_mut() { + // Prefilter reads a reduced projection. For PK prefilter, the encoded + // primary key column is always appended as the last projected column, + // while `__sequence` and `__op_type` are not read. + let pk_column_index = batch.num_columns() - 1; + let matched_row_ranges = + matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter.as_mut())?; + let mut builder = BooleanBufferBuilder::new(batch.num_rows()); + builder.append_n(batch.num_rows(), false); + for range in matched_row_ranges { + for row in range { + builder.set_bit(row, true); + } + } + mask = mask.bitand(&builder.finish()); + } + + for filter_ctx in filters { + let filter = match filter_ctx.filter() { + MaybeFilter::Filter(filter) => filter, + MaybeFilter::Matched => continue, + MaybeFilter::Pruned => return Ok(None), + }; + + let (idx, _) = batch + .schema() + .column_with_name(filter.column_name()) + .with_context(|| UnexpectedSnafu { + reason: format!( + "Prefilter column '{}' (id {}) not found in batch for file {}", + filter.column_name(), + filter_ctx.column_id(), + file_path + ), + })?; + let column = batch.column(idx).clone(); + let result = filter.evaluate_array(&column).context(RecordBatchSnafu)?; + mask = mask.bitand(&result); + } + + for filter_ctx in physical_filters { + let filter = filter_ctx.filter(); + + let (idx, _) = batch + .schema() + .column_with_name(filter_ctx.column_name()) + .with_context(|| UnexpectedSnafu { + reason: format!( + "Prefilter physical column '{}' (id {}) not found in batch for file {}", + filter_ctx.column_name(), + filter_ctx.column_id(), + file_path + ), + })?; + let column = batch.column(idx).clone(); + + let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column]) + .context(NewRecordBatchSnafu)?; + let evaluated = filter + .evaluate(&record_batch) + .context(EvalPartitionFilterSnafu)?; + let array = evaluated + .into_array(record_batch.num_rows()) + .context(EvalPartitionFilterSnafu)?; + let boolean_array = + array + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + reason: "Failed to downcast physical filter result to BooleanArray", + })?; + // Treat null results as false (filtered out); value bits are not guaranteed + // to be false for invalid entries. + let mut result = boolean_array.values().clone(); + if let Some(nulls) = boolean_array.nulls() { + result = result.bitand(nulls.inner()); + } + mask = mask.bitand(&result); + } + + if mask.count_set_bits() == 0 { + Ok(None) + } else { + Ok(Some(mask)) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -399,47 +730,17 @@ mod tests { }; use datatypes::arrow::datatypes::{Schema, UInt32Type}; use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec}; + use parquet::arrow::ArrowSchemaConverter; use store_api::codec::PrimaryKeyEncoding; use super::*; use crate::sst::internal_fields; - use crate::sst::parquet::flat_format::FlatReadFormat; + use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index}; use crate::test_util::sst_util::{ - new_primary_key, sst_region_metadata, sst_region_metadata_with_encoding, + new_primary_key, new_record_batch_with_custom_sequence, sst_region_metadata, + sst_region_metadata_with_encoding, }; - #[test] - fn test_is_usable_primary_key_filter_skips_legacy_primary_key_batches() { - let metadata = Arc::new(sst_region_metadata_with_encoding( - PrimaryKeyEncoding::Sparse, - )); - let read_format = FlatReadFormat::new( - metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), - None, - "test", - true, - ) - .unwrap(); - assert!(!read_format.batch_has_raw_pk_columns()); - - let filter = SimpleFilterEvaluator::try_new(&col("tag_0").eq(lit("b"))).unwrap(); - assert!(is_usable_primary_key_filter(&metadata, None, &filter)); - } - - #[test] - fn test_is_usable_primary_key_filter_supports_partition_column_by_default() { - let metadata = Arc::new(sst_region_metadata_with_encoding( - PrimaryKeyEncoding::Sparse, - )); - let filter = SimpleFilterEvaluator::try_new( - &col(store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME).eq(lit(1_u32)), - ) - .unwrap(); - - assert!(is_usable_primary_key_filter(&metadata, None, &filter)); - } - struct CountingPrimaryKeyFilter { hits: Arc, expected: Vec, @@ -479,6 +780,33 @@ mod tests { .collect() } + fn new_simple_filter_contexts( + metadata: &RegionMetadataRef, + exprs: &[datafusion_expr::Expr], + ) -> Vec { + exprs + .iter() + .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr)) + .collect() + } + + fn new_physical_filter_contexts( + metadata: &RegionMetadataRef, + read_format: &FlatReadFormat, + exprs: &[datafusion_expr::Expr], + ) -> Vec { + exprs + .iter() + .filter_map(|expr| PhysicalFilterContext::new_opt(metadata, None, read_format, expr)) + .collect() + } + + fn parquet_schema(read_format: &FlatReadFormat) -> SchemaDescriptor { + ArrowSchemaConverter::new() + .convert(read_format.arrow_schema()) + .unwrap() + } + fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch { assert_eq!(primary_keys.len(), field_values.len()); @@ -530,6 +858,53 @@ mod tests { .unwrap() } + fn new_prefilter_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch { + assert_eq!(primary_keys.len(), field_values.len()); + + let metadata = Arc::new(sst_region_metadata()); + let arrow_schema = metadata.schema.arrow_schema(); + let field_column = arrow_schema + .field(arrow_schema.index_of("field_0").unwrap()) + .clone(); + let time_index_column = arrow_schema + .field(arrow_schema.index_of("ts").unwrap()) + .clone(); + let schema = Arc::new(Schema::new(vec![ + field_column, + time_index_column, + internal_fields()[0].as_ref().clone(), + ])); + + let mut dict_values = Vec::new(); + let mut keys = Vec::with_capacity(primary_keys.len()); + for pk in primary_keys { + let key = dict_values + .iter() + .position(|existing: &&[u8]| existing == pk) + .unwrap_or_else(|| { + dict_values.push(*pk); + dict_values.len() - 1 + }); + keys.push(key as u32); + } + let pk_array: ArrayRef = Arc::new(DictionaryArray::::new( + UInt32Array::from(keys), + Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())), + )); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(UInt64Array::from(field_values.to_vec())), + Arc::new(TimestampMillisecondArray::from_iter_values( + 0..primary_keys.len() as i64, + )), + pk_array, + ], + ) + .unwrap() + } + fn field_values(batch: &RecordBatch) -> Vec { batch .column(0) @@ -540,6 +915,13 @@ mod tests { .to_vec() } + fn remaining_simple_filter_columns(filters: &[SimpleFilterContext]) -> Vec<&str> { + filters + .iter() + .map(|filter_ctx| filter_ctx.filter().as_filter().unwrap().column_name()) + .collect() + } + #[test] fn test_prefilter_primary_key_drops_single_dictionary_batch() { let metadata = Arc::new(sst_region_metadata()); @@ -592,4 +974,236 @@ mod tests { assert_eq!(filtered.num_rows(), 4); assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]); } + + #[test] + fn test_prefilter_builder_returns_none_without_selected_filters() { + let metadata: RegionMetadataRef = + Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense)); + let read_format = FlatReadFormat::new( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "test", + false, + ) + .unwrap(); + let codec = build_primary_key_codec(metadata.as_ref()); + let parquet_schema = parquet_schema(&read_format); + + let builder = PrefilterContextBuilder::new( + &read_format, + &codec, + None, + Vec::new(), + Vec::new(), + &parquet_schema, + ); + assert!(builder.is_none()); + } + + #[test] + fn test_should_use_prefilter() { + assert!(should_use_prefilter(1, 5, 6)); + assert!(!should_use_prefilter(1, 0, 1)); + assert!(!should_use_prefilter(1, 1, 2)); + assert!(!should_use_prefilter(4, 3, 7)); + assert!(should_use_prefilter(3, 3, 6)); + } + + #[test] + fn test_build_bulk_filter_plan_classifies_filters_across_read_paths() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding( + PrimaryKeyEncoding::Sparse, + )); + let legacy_read_format = FlatReadFormat::new( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "memtable", + false, + ) + .unwrap(); + assert!(!legacy_read_format.batch_has_raw_pk_columns()); + + let plan = build_bulk_filter_plan( + &legacy_read_format, + Some(&Predicate::new(vec![ + col("tag_0").eq(lit("a")), + col("field_0").gt(lit(1_u64)), + ])), + ); + assert_eq!( + plan.pk_filters.as_ref().map(|filters| filters.len()), + Some(1) + ); + assert_eq!( + remaining_simple_filter_columns(&plan.remaining_simple_filters), + vec!["field_0"] + ); + + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let raw_pk_read_format = FlatReadFormat::new( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "memtable", + true, + ) + .unwrap(); + assert!(raw_pk_read_format.batch_has_raw_pk_columns()); + + let tag_only_plan = build_bulk_filter_plan( + &raw_pk_read_format, + Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])), + ); + assert!(tag_only_plan.pk_filters.is_none()); + assert_eq!( + remaining_simple_filter_columns(&tag_only_plan.remaining_simple_filters), + vec!["tag_0"] + ); + + let field_only_plan = build_bulk_filter_plan( + &raw_pk_read_format, + Some(&Predicate::new(vec![col("field_0").gt(lit(1_u64))])), + ); + assert!(field_only_plan.pk_filters.is_none()); + assert_eq!( + remaining_simple_filter_columns(&field_only_plan.remaining_simple_filters), + vec!["field_0"] + ); + } + + #[test] + fn test_build_reader_filter_plan_classifies_filters_for_prefilter_modes() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let full_read_format = FlatReadFormat::new( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "test", + true, + ) + .unwrap(); + let full_parquet_schema = parquet_schema(&full_read_format); + let codec = build_primary_key_codec(metadata.as_ref()); + + let skip_fields_plan = build_reader_filter_plan( + Some(&Predicate::new(vec![ + col("tag_0").eq(lit("a")), + col("field_0").gt(lit(1_u64)), + ])), + None, + PreFilterMode::SkipFields, + &full_read_format, + &full_parquet_schema, + &codec, + ); + assert!(skip_fields_plan.prefilter_builder.is_some()); + assert_eq!( + remaining_simple_filter_columns(&skip_fields_plan.remaining_simple_filters), + vec!["field_0"] + ); + + let field_0 = metadata.column_by_name("field_0").unwrap().column_id; + let ts = metadata.time_index_column().column_id; + let projected_read_format = FlatReadFormat::new( + metadata.clone(), + [field_0, ts].into_iter(), + None, + "test", + true, + ) + .unwrap(); + let projected_parquet_schema = parquet_schema(&projected_read_format); + let pk_prefilter_plan = build_reader_filter_plan( + Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])), + None, + PreFilterMode::All, + &projected_read_format, + &projected_parquet_schema, + &codec, + ); + assert!(pk_prefilter_plan.prefilter_builder.is_some()); + assert!(pk_prefilter_plan.remaining_simple_filters.is_empty()); + } + + #[test] + fn test_apply_filters_to_batch_uses_flat_tag_columns_directly() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]); + let batch = new_record_batch_with_custom_sequence(&["a", "x"], 0, 4, 1); + + let mut no_pk_filter = None; + let mask = apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test") + .unwrap() + .unwrap(); + assert_eq!(mask.count_set_bits(), 4); + } + + #[test] + fn test_apply_filters_to_batch_errors_on_missing_selected_column() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]); + let pk = new_primary_key(&["a", "x"]); + let batch = new_raw_batch(&[pk.as_slice()], &[10]); + + let mut no_pk_filter = None; + let err = + apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test").unwrap_err(); + let err = err.to_string(); + assert!(err.contains("Prefilter column")); + assert!(err.contains("tag_0")); + } + + #[test] + fn test_apply_filters_to_batch_evaluates_physical_filters() { + let metadata: RegionMetadataRef = + Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense)); + let read_format = FlatReadFormat::new( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "test", + false, + ) + .unwrap(); + let expr = col("field_0").in_list(vec![lit(11_u64)], false); + let physical_filters = new_physical_filter_contexts(&metadata, &read_format, &[expr]); + let pk = new_primary_key(&["a", "x"]); + let batch = new_raw_batch(&[pk.as_slice(), pk.as_slice(), pk.as_slice()], &[9, 10, 11]); + + let mut no_pk_filter = None; + let mask = + apply_filters_to_batch(&batch, &mut no_pk_filter, &[], &physical_filters, "test") + .unwrap() + .unwrap(); + assert_eq!(mask.count_set_bits(), 1); + } + + #[test] + fn test_apply_filters_to_batch_uses_last_projected_column_for_pk_prefilter() { + let metadata = Arc::new(sst_region_metadata()); + let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))])); + let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new( + build_primary_key_codec(metadata.as_ref()) + .primary_key_filter(&metadata, filters, false), + )) as Box); + let pk_a = new_primary_key(&["a", "x"]); + let pk_b = new_primary_key(&["b", "x"]); + let batch = new_prefilter_batch( + &[ + pk_a.as_slice(), + pk_a.as_slice(), + pk_b.as_slice(), + pk_b.as_slice(), + ], + &[10, 11, 12, 13], + ); + + let mask = apply_filters_to_batch(&batch, &mut pk_filter, &[], &[], "test") + .unwrap() + .unwrap(); + + assert_eq!(mask.count_set_bits(), 2); + } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index def97d8f67..5688133c46 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -24,10 +24,12 @@ use std::time::{Duration, Instant}; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; -use common_telemetry::{tracing, warn}; +use common_telemetry::{error, tracing, warn}; +use datafusion::physical_plan::PhysicalExpr; use datafusion_expr::Expr; +use datafusion_expr::utils::expr_to_columns; use datatypes::arrow::array::ArrayRef; -use datatypes::arrow::datatypes::{Field, SchemaRef}; +use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; @@ -46,7 +48,6 @@ use store_api::region_request::PathType; use store_api::storage::{ColumnId, FileId}; use table::predicate::Predicate; -use self::stream::{ParquetErrorAdapter, ProjectedRecordBatchStream}; use crate::cache::index::result_cache::PredicateKey; use crate::cache::{CacheStrategy, CachedSstMeta}; #[cfg(feature = "vector_index")] @@ -79,11 +80,12 @@ use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::parquet::format::need_override_sequence; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::prefilter::{ - PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter, + PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter, }; use crate::sst::parquet::read_columns::{ ParquetReadColumns, ProjectionMaskPlan, build_projection_plan, }; +use crate::sst::parquet::reader::stream::{ParquetErrorAdapter, ProjectedRecordBatchStream}; use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -450,22 +452,6 @@ impl ParquetReaderBuilder { ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options) .context(ReadDataPartSnafu)?; - let filters = if let Some(predicate) = &self.predicate { - predicate - .exprs() - .iter() - .filter_map(|expr| { - SimpleFilterContext::new_opt( - ®ion_meta, - self.expected_metadata.as_deref(), - expr, - ) - }) - .collect::>() - } else { - vec![] - }; - let dyn_filters = if let Some(predicate) = &self.predicate { predicate.dyn_filters().as_ref().clone() } else { @@ -474,20 +460,13 @@ impl ParquetReaderBuilder { let codec = build_primary_key_codec(read_format.metadata()); - // Extract primary key filters from precomputed filter contexts for prefiltering. - let primary_key_filters = { - let pk_filters = filters - .iter() - .filter_map(SimpleFilterContext::primary_key_prefilter) - .collect::>(); - (!pk_filters.is_empty()).then_some(Arc::new(pk_filters)) - }; - - let prefilter_builder = PrefilterContextBuilder::new( + let filter_plan = build_reader_filter_plan( + self.predicate.as_ref(), + self.expected_metadata.as_deref(), + self.pre_filter_mode, &read_format, - &codec, - primary_key_filters.as_ref(), parquet_meta.file_metadata().schema_descr(), + &codec, ); let output_schema = read_format.output_arrow_schema()?; @@ -501,7 +480,7 @@ impl ParquetReaderBuilder { object_store: self.object_store.clone(), projection: projection_plan, cache_strategy: self.cache_strategy.clone(), - prefilter_builder, + prefilter_builder: filter_plan.prefilter_builder, }; let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?; @@ -509,7 +488,7 @@ impl ParquetReaderBuilder { let context = FileRangeContext::new( reader_builder, RangeBase { - filters, + filters: filter_plan.remaining_simple_filters, dyn_filters, read_format, expected_metadata: self.expected_metadata.clone(), @@ -1657,12 +1636,6 @@ pub(crate) struct RowGroupReaderBuilder { /// Context passed to [RowGroupReaderBuilder::build()] carrying all information /// needed for prefiltering decisions. pub(crate) struct RowGroupBuildContext<'a> { - /// Simple filters pushed down. Used by prefilter on other columns. - #[allow(dead_code)] - pub(crate) filters: &'a [SimpleFilterContext], - /// Whether to skip field filters. Used by prefilter on other columns. - #[allow(dead_code)] - pub(crate) skip_fields: bool, /// Index of the row group to read. pub(crate) row_group_idx: usize, /// Row selection for the row group. `None` means all rows. @@ -1690,7 +1663,7 @@ impl RowGroupReaderBuilder { &self.cache_strategy } - pub(crate) fn has_flat_primary_key_prefilter(&self) -> bool { + pub(crate) fn has_predicate_prefilter(&self) -> bool { self.prefilter_builder.is_some() } @@ -1699,6 +1672,19 @@ impl RowGroupReaderBuilder { /// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read: /// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection /// 2. Reads the full projection with the refined row selection + /// + /// The prefilter pass is *best-effort pruning*, not the precise filter for the query. + /// Predicates that cannot be lowered to prefilter columns (column not projected, + /// expression not supported, etc.) are silently skipped. Correctness rests on the + /// DataFusion `FilterExec` above this reader, which always re-applies the original + /// predicate. Tag and timestamp predicates that flow through [`SimpleFilterEvaluator`] + /// are an exception — the engine enforces them precisely, so the prefilter pass is the + /// only place they execute. See [`build_reader_filter_plan`] for the bucketing rules. + /// + /// When the prefilter result selects no rows, the second read still issues but + /// parquet-rs short-circuits before any column-chunk IO: the row-group state machine + /// jumps to `Finished` once it sees `num_rows_selected() == 0`, so no fast path is + /// added here. pub(crate) async fn build( &self, build_ctx: RowGroupBuildContext<'_>, @@ -1792,6 +1778,7 @@ impl RowGroupReaderBuilder { } } +#[derive(Clone)] /// The filter to evaluate or the prune result of the default value. pub(crate) enum MaybeFilter { /// The filter to evaluate. @@ -1802,6 +1789,17 @@ pub(crate) enum MaybeFilter { Pruned, } +impl MaybeFilter { + /// Returns the inner filter when it is available. + pub(crate) fn as_filter(&self) -> Option<&SimpleFilterEvaluator> { + match self { + MaybeFilter::Filter(filter) => Some(filter), + MaybeFilter::Matched | MaybeFilter::Pruned => None, + } + } +} + +#[derive(Clone)] /// Context to evaluate the column filter for a parquet file. pub(crate) struct SimpleFilterContext { /// Filter to evaluate. @@ -1810,8 +1808,6 @@ pub(crate) struct SimpleFilterContext { column_id: ColumnId, /// Semantic type of the column. semantic_type: SemanticType, - /// Whether this filter can be applied by flat parquet primary-key prefiltering. - usable_primary_key_filter: bool, } impl SimpleFilterContext { @@ -1825,10 +1821,6 @@ impl SimpleFilterContext { expr: &Expr, ) -> Option { let filter = SimpleFilterEvaluator::try_new(expr)?; - // Parquet PK prefilter always supports the partition column. Only - // PartitionTreeMemtable skips it after partition pruning. - let usable_primary_key_filter = - is_usable_primary_key_filter(sst_meta, expected_meta, &filter); let (column_metadata, maybe_filter) = match expected_meta { Some(meta) => { // Gets the column metadata from the expected metadata. @@ -1859,14 +1851,10 @@ impl SimpleFilterContext { } }; - let usable_primary_key_filter = - matches!(maybe_filter, MaybeFilter::Filter(_)) && usable_primary_key_filter; - Some(Self { filter: maybe_filter, column_id: column_metadata.column_id, semantic_type: column_metadata.semantic_type, - usable_primary_key_filter, }) } @@ -1884,22 +1872,116 @@ impl SimpleFilterContext { pub(crate) fn semantic_type(&self) -> SemanticType { self.semantic_type } +} - /// Returns whether this filter is eligible for flat parquet PK prefiltering. - pub(crate) fn usable_primary_key_filter(&self) -> bool { - self.usable_primary_key_filter - } +/// Context to evaluate a physical expression for a parquet file. +#[derive(Clone)] +pub(crate) struct PhysicalFilterContext { + /// Filter to evaluate. + filter: Arc, + /// Id of the column to evaluate. + column_id: ColumnId, + /// Name of the column to evaluate. + column_name: String, + /// Semantic type of the column. + semantic_type: SemanticType, + /// Schema containing only the referenced column. + schema: SchemaRef, +} - /// Returns the filter evaluator when it is eligible for PK prefiltering. - pub(crate) fn primary_key_prefilter(&self) -> Option { - if !self.usable_primary_key_filter { +impl PhysicalFilterContext { + /// Creates a context for the `expr`. + /// + /// Returns None if the expression doesn't reference exactly one column or the + /// column to filter doesn't exist in the SST metadata or the expected metadata. + pub(crate) fn new_opt( + sst_meta: &RegionMetadataRef, + expected_meta: Option<&RegionMetadata>, + read_format: &FlatReadFormat, + expr: &Expr, + ) -> Option { + if !Self::is_prefilter_candidate(expr) { return None; } + let column_name = Self::single_column_name(expr)?; + let column_metadata = match expected_meta { + Some(meta) => { + let column = meta.column_by_name(&column_name)?; + let sst_column = sst_meta.column_by_id(column.column_id)?; + // Physical expr requires the column name to match the SST column name. + if sst_column.column_schema.name != column_name { + return None; + } + column + } + None => sst_meta.column_by_name(&column_name)?, + }; - match &self.filter { - MaybeFilter::Filter(filter) => Some(filter.clone()), - MaybeFilter::Matched | MaybeFilter::Pruned => None, + // The column must be present in the projected arrow schema for the + // prefilter to be able to read it. + let (_, field) = read_format.arrow_schema().column_with_name(&column_name)?; + let field = field.clone(); + let schema = Arc::new(ArrowSchema::new(vec![field])); + let physical_expr = Predicate::to_physical_expr(expr, &schema) + .inspect_err(|e| { + error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}"); + }) + .ok()?; + + Some(Self { + filter: physical_expr, + column_id: column_metadata.column_id, + column_name, + semantic_type: column_metadata.semantic_type, + schema, + }) + } + + /// Returns true if the expression is a variant we want to evaluate as a + /// physical prefilter. Binary exprs are intentionally excluded because + /// [`SimpleFilterEvaluator`] already handles them. + // TODO(yingwen): extend more expressions if necessary. For example, allow some cheap scalar functions (e.g. `lower`, `length`, date truncations) + fn is_prefilter_candidate(expr: &Expr) -> bool { + matches!( + expr, + Expr::InList(_) | Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Between(_) + ) + } + + fn single_column_name(expr: &Expr) -> Option { + let mut columns = HashSet::new(); + if expr_to_columns(expr, &mut columns).is_err() { + return None; } + if columns.len() != 1 { + return None; + } + columns.iter().next().map(|column| column.name.clone()) + } + + /// Returns the filter to evaluate. + pub(crate) fn filter(&self) -> &Arc { + &self.filter + } + + /// Returns the column id. + pub(crate) fn column_id(&self) -> ColumnId { + self.column_id + } + + /// Returns the column name. + pub(crate) fn column_name(&self) -> &str { + &self.column_name + } + + /// Returns the semantic type of the column. + pub(crate) fn semantic_type(&self) -> SemanticType { + self.semantic_type + } + + /// Returns the schema containing only the referenced column. + pub(crate) fn schema(&self) -> &SchemaRef { + &self.schema } } @@ -1956,7 +2038,6 @@ impl ParquetReader { row_group_idx, Some(row_selection), Some(&self.fetch_metrics), - skip_fields, )) .await?; self.reader = Some(FlatPruneReader::new_with_row_group_reader( @@ -1987,7 +2068,6 @@ impl ParquetReader { row_group_idx, Some(row_selection), Some(&fetch_metrics), - skip_fields, )) .await?; Some(FlatPruneReader::new_with_row_group_reader( @@ -2227,32 +2307,78 @@ mod tests { } #[test] - fn test_simple_filter_context_marks_usable_primary_key_filter() { + fn test_simple_filter_context_uses_default_value_for_mismatched_expected_metadata() { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); - let ctx = - SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(); - - assert!(ctx.usable_primary_key_filter()); - assert!(ctx.primary_key_prefilter().is_some()); - } - - #[test] - fn test_simple_filter_context_skips_non_usable_primary_key_filter() { - let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); - - let field_ctx = - SimpleFilterContext::new_opt(&metadata, None, &col("field_0").eq(lit(1_u64))).unwrap(); - assert!(!field_ctx.usable_primary_key_filter()); - assert!(field_ctx.primary_key_prefilter().is_none()); - let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref()); - let mismatched_ctx = SimpleFilterContext::new_opt( + let ctx = SimpleFilterContext::new_opt( &metadata, Some(expected_metadata.as_ref()), &col("tag_0").eq(lit("a")), ) .unwrap(); - assert!(!mismatched_ctx.usable_primary_key_filter()); - assert!(mismatched_ctx.primary_key_prefilter().is_none()); + assert!(matches!( + ctx.filter(), + MaybeFilter::Matched | MaybeFilter::Pruned + )); + } + + #[test] + fn test_physical_filter_context_skips_renamed_column() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref()); + let read_format = FlatReadFormat::new( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "test", + true, + ) + .unwrap(); + + let ctx = PhysicalFilterContext::new_opt( + &metadata, + Some(expected_metadata.as_ref()), + &read_format, + &col("tag_0").in_list(vec![lit("a"), lit("b")], false), + ); + + assert!(ctx.is_none()); + } + + #[test] + fn test_physical_filter_context_only_accepts_prefilter_candidates() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let read_format = FlatReadFormat::new( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "test", + true, + ) + .unwrap(); + + // InList is on the allowlist — should build a context. + let in_list = col("tag_0").in_list(vec![lit("a"), lit("b")], false); + assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &in_list).is_some()); + + // NOT IN uses the same variant with `negated: true` — also accepted. + let not_in = col("tag_0").in_list(vec![lit("a"), lit("b")], true); + assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, ¬_in).is_some()); + + // IS NULL / IS NOT NULL are accepted. + let is_null = col("tag_0").is_null(); + assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_null).is_some()); + let is_not_null = col("tag_0").is_not_null(); + assert!( + PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_not_null).is_some() + ); + + // BETWEEN is accepted. + let between = col("field_0").between(lit(1_u64), lit(10_u64)); + assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &between).is_some()); + + // Binary expr is handled by SimpleFilterEvaluator — rejected here. + let binary = col("tag_0").eq(lit("a")); + assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &binary).is_none()); } } diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index 763e244ef2..cf28734ffc 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -567,26 +567,6 @@ pub(crate) fn row_selection_from_row_ranges( RowSelection::from(selectors) } -/// Like [`row_selection_from_row_ranges`] but guarantees the resulting selection -/// covers exactly `total_row_count` rows by appending a trailing skip if needed. -/// -/// Required when the result is used as the inner operand of [`RowSelection::and_then`], because -/// `and_then` expects the inner selection to account for every row selected by the outer one. -pub(crate) fn row_selection_from_row_ranges_exact( - row_ranges: impl Iterator>, - total_row_count: usize, -) -> RowSelection { - let (mut selectors, last_processed_end) = - build_selectors_from_row_ranges(row_ranges, total_row_count); - if last_processed_end < total_row_count { - // Preserve the full logical length of the selection even when the final rows are all - // filtered out. Without this trailing skip, `and_then` sees an undersized inner - // selection and panics. - add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true); - } - RowSelection::from(selectors) -} - fn build_selectors_from_row_ranges( row_ranges: impl Iterator>, total_row_count: usize, @@ -739,56 +719,6 @@ mod tests { assert_eq!(selection, expected); } - #[test] - fn test_exact_single_range_with_trailing_skip() { - let selection = row_selection_from_row_ranges_exact(Some(0..3).into_iter(), 6); - let expected = RowSelection::from(vec![RowSelector::select(3), RowSelector::skip(3)]); - assert_eq!(selection, expected); - assert_eq!(selection.row_count(), 3); - } - - #[test] - fn test_exact_non_contiguous_ranges() { - let ranges = [1..3, 5..8]; - let selection = row_selection_from_row_ranges_exact(ranges.iter().cloned(), 10); - let expected = RowSelection::from(vec![ - RowSelector::skip(1), - RowSelector::select(2), - RowSelector::skip(2), - RowSelector::select(3), - RowSelector::skip(2), - ]); - assert_eq!(selection, expected); - assert_eq!(selection.row_count(), 5); - } - - #[test] - fn test_exact_empty_ranges() { - let selection = row_selection_from_row_ranges_exact([].iter().cloned(), 10); - let expected = RowSelection::from(vec![RowSelector::skip(10)]); - assert_eq!(selection, expected); - assert_eq!(selection.row_count(), 0); - } - - #[test] - fn test_exact_range_covers_all_rows() { - let selection = row_selection_from_row_ranges_exact(Some(0..10).into_iter(), 10); - let expected = RowSelection::from(vec![RowSelector::select(10)]); - assert_eq!(selection, expected); - assert_eq!(selection.row_count(), 10); - } - - #[test] - fn test_exact_compatible_with_and_then() { - // Outer selects rows 0..6 out of 10. - let outer = RowSelection::from(vec![RowSelector::select(6), RowSelector::skip(4)]); - // Inner: within those 6 rows, select only rows 0..3. - let inner = row_selection_from_row_ranges_exact(Some(0..3).into_iter(), 6); - let result = outer.and_then(&inner); - let expected = RowSelection::from(vec![RowSelector::select(3), RowSelector::skip(7)]); - assert_eq!(result, expected); - } - #[test] fn test_row_ids_to_selection() { let row_ids = [1, 3, 5, 7, 9].into_iter(); diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 2c9ac41560..daf404d5d6 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -120,11 +120,11 @@ impl Predicate { .context(error::DatafusionSnafu) } - /// Builds physical exprs according to provided schema. - pub fn to_physical_exprs( - &self, + /// Builds a single physical expr according to provided schema. + pub fn to_physical_expr( + expr: &Expr, schema: &arrow::datatypes::SchemaRef, - ) -> error::Result>> { + ) -> error::Result> { let df_schema = schema .clone() .to_dfschema_ref() @@ -135,12 +135,21 @@ impl Predicate { // registering variables. let execution_props = &ExecutionProps::new(); + create_physical_expr(expr, df_schema.as_ref(), execution_props) + .context(error::DatafusionSnafu) + } + + /// Builds physical exprs according to provided schema. + pub fn to_physical_exprs( + &self, + schema: &arrow::datatypes::SchemaRef, + ) -> error::Result>> { let dyn_filters = self.dyn_filter_phy_exprs()?; Ok(self .exprs .iter() - .filter_map(|expr| create_physical_expr(expr, df_schema.as_ref(), execution_props).ok()) + .filter_map(|expr| Self::to_physical_expr(expr, schema).ok()) .chain(dyn_filters) .collect::>()) } @@ -730,5 +739,8 @@ mod tests { let predicates = predicate.to_physical_exprs(&schema).unwrap(); assert!(!predicates.is_empty()); + + let physical_expr = Predicate::to_physical_expr(&col("host").eq(lit("host_a")), &schema); + assert!(physical_expr.is_ok()); } }