feat: support prefiltering any columns in flat format (#7972)

* refactor: prepare parquet prefilter for multi-column execution

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: restore parquet physical filter contexts

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add generalized parquet prefilter projection

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: avoid re-evaluating parquet prefiltered predicates

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: cover generalized parquet prefilter behavior

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove variant

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: only prefilter physical exprs

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove execute_general_prefilter

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: only prefilter cheap exprs

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: context usage

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: categorize filters

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: prefilter plan for bulk memtable

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: move parquet filter plan builders into prefilter

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: simplify tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: enable prefilter by threshold

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: correct pk filter grouping

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove unused code

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix warning

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: handle nulls in physical filter result

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fmt import

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update comments

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-29 11:35:22 +08:00
committed by GitHub
parent 8955e2c651
commit 6a84393e08
7 changed files with 1037 additions and 391 deletions

View File

@@ -27,8 +27,7 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::parquet::prefilter::CachedPrimaryKeyFilter;
use crate::sst::parquet::reader::SimpleFilterContext;
use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, build_bulk_filter_plan};
use crate::sst::parquet::stats::RowGroupPruningStats;
pub(crate) type BulkIterContextRef = Arc<BulkIterContext>;
@@ -66,17 +65,6 @@ impl BulkIterContext {
) -> Result<Self> {
let codec = build_primary_key_codec(&region_metadata);
let simple_filters: Vec<SimpleFilterContext> = predicate
.as_ref()
.iter()
.flat_map(|predicate| {
predicate
.exprs()
.iter()
.filter_map(|expr| SimpleFilterContext::new_opt(&region_metadata, None, expr))
})
.collect();
let read_format = if let Some(column_ids) = projection {
FlatReadFormat::new(
region_metadata.clone(),
@@ -103,12 +91,11 @@ impl BulkIterContext {
.map(|pred| pred.dyn_filters().as_ref().clone())
.unwrap_or_default();
// Pre-extract PK filters if applicable.
let pk_filters = Self::extract_pk_filters(&read_format, &simple_filters);
let filter_plan = build_bulk_filter_plan(&read_format, predicate.as_ref());
Ok(Self {
base: RangeBase {
filters: simple_filters,
filters: filter_plan.remaining_simple_filters,
dyn_filters,
read_format,
prune_schema: region_metadata.schema.clone(),
@@ -121,7 +108,7 @@ impl BulkIterContext {
partition_filter: None,
},
predicate,
pk_filters,
pk_filters: filter_plan.pk_filters,
})
}
@@ -153,30 +140,6 @@ impl BulkIterContext {
}
}
/// Extracts PK filters if flat format with dictionary-encoded PKs is used.
fn extract_pk_filters(
read_format: &FlatReadFormat,
filters: &[SimpleFilterContext],
) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
if read_format.batch_has_raw_pk_columns() {
return None;
}
let metadata = read_format.metadata();
if metadata.primary_key.is_empty() {
return None;
}
let pk_filters: Vec<_> = filters
.iter()
.filter_map(|f| f.primary_key_prefilter())
.collect();
if pk_filters.is_empty() {
return None;
}
Some(Arc::new(pk_filters))
}
/// Builds a fresh PK filter for a new iterator. Returns `None` if PK
/// prefiltering is not applicable.
pub(crate) fn build_pk_filter(&self) -> Option<CachedPrimaryKeyFilter> {

View File

@@ -380,7 +380,6 @@ fn apply_combined_filters(
metrics: &mut MemScanMetricsData,
) -> error::Result<Option<RecordBatch>> {
// Apply PK prefilter on raw batch before convert_batch to reduce conversion overhead.
let has_pk_prefilter = pk_filter.is_some();
let record_batch = if let Some(pk_filter) = pk_filter {
let rows_before = record_batch.num_rows();
let prefilter_start = Instant::now();
@@ -413,7 +412,6 @@ fn apply_combined_filters(
let predicate_mask = context.base.compute_filter_mask_flat(
&record_batch,
skip_fields,
has_pk_prefilter,
&mut tag_decode_state,
)?;
// If predicate filters out the entire batch, return None early

View File

@@ -172,7 +172,6 @@ impl FileRange {
self.row_group_idx,
self.row_selection.clone(),
fetch_metrics,
skip_fields,
))
.await?;
@@ -199,7 +198,7 @@ impl FileRange {
FlatRowGroupReader::new(self.context.clone(), parquet_reader);
// Flat PK prefilter makes the input stream predicate-dependent, so cached
// selector results are not reusable across queries with different filters.
let cache_strategy = if self.context.reader_builder.has_flat_primary_key_prefilter() {
let cache_strategy = if self.context.reader_builder.has_predicate_prefilter() {
CacheStrategy::Disabled
} else {
self.context.reader_builder.cache_strategy().clone()
@@ -297,16 +296,13 @@ impl FileRangeContext {
/// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
/// If a partition expr filter is configured, it is also applied.
/// Physical filter exprs are not evaluated here; they are only applied during prefiltering.
pub(crate) fn precise_filter_flat(
&self,
input: RecordBatch,
skip_fields: bool,
) -> Result<Option<RecordBatch>> {
self.base.precise_filter_flat(
input,
skip_fields,
self.reader_builder.has_flat_primary_key_prefilter(),
)
self.base.precise_filter_flat(input, skip_fields)
}
pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
@@ -325,11 +321,8 @@ impl FileRangeContext {
row_group_idx: usize,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&'a ParquetFetchMetrics>,
skip_fields: bool,
) -> RowGroupBuildContext<'a> {
RowGroupBuildContext {
filters: &self.base.filters,
skip_fields,
row_group_idx,
row_selection,
fetch_metrics,
@@ -344,7 +337,7 @@ impl FileRangeContext {
}
/// Mode to pre-filter columns in a range.
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PreFilterMode {
/// Filters all columns.
All,
@@ -415,15 +408,9 @@ impl RangeBase {
&self,
input: RecordBatch,
skip_fields: bool,
skip_prefiltered_pk_filters: bool,
) -> Result<Option<RecordBatch>> {
let mut tag_decode_state = TagDecodeState::new();
let mask = self.compute_filter_mask_flat(
&input,
skip_fields,
skip_prefiltered_pk_filters,
&mut tag_decode_state,
)?;
let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
// If mask is None, the entire batch is filtered out
let Some(mut mask) = mask else {
@@ -458,6 +445,7 @@ impl RangeBase {
/// Computes the filter mask for the input RecordBatch based on pushed down predicates.
/// If a partition expr filter is configured, it is applied later in `precise_filter_flat` but **NOT** in this function.
/// Physical filter exprs are excluded here and only apply during prefiltering.
///
/// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
///
@@ -468,7 +456,6 @@ impl RangeBase {
&self,
input: &RecordBatch,
skip_fields: bool,
skip_prefiltered_pk_filters: bool,
tag_decode_state: &mut TagDecodeState,
) -> Result<Option<BooleanBuffer>> {
let mut mask = BooleanBuffer::new_set(input.num_rows());
@@ -490,12 +477,6 @@ impl RangeBase {
continue;
}
// Flat parquet PK prefiltering already applied these tag predicates while refining
// row selection, so skip them here to avoid decoding/evaluating the same condition twice.
if skip_prefiltered_pk_filters && filter_ctx.usable_primary_key_filter() {
continue;
}
// Get the column directly by its projected index.
// If the column is missing and it's not a tag/time column, this filter is skipped.
// Assumes the projection indices align with the input batch schema.
@@ -703,7 +684,7 @@ mod tests {
}
#[test]
fn test_compute_filter_mask_flat_skips_prefiltered_pk_filters() {
fn test_compute_filter_mask_flat_applies_remaining_simple_filters() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let filters = vec![
SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(),
@@ -712,16 +693,38 @@ mod tests {
let base = new_test_range_base(filters);
let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1);
let mask_without_skip = base
.compute_filter_mask_flat(&batch, false, false, &mut TagDecodeState::new())
let mask = base
.compute_filter_mask_flat(&batch, false, &mut TagDecodeState::new())
.unwrap()
.unwrap();
assert_eq!(mask_without_skip.count_set_bits(), 0);
assert_eq!(mask.count_set_bits(), 0);
}
let mask_with_skip = base
.compute_filter_mask_flat(&batch, false, true, &mut TagDecodeState::new())
#[test]
fn test_compute_filter_mask_flat_does_not_postfilter_physical_filters() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
None,
"test",
true,
)
.unwrap();
let physical_filter = crate::sst::parquet::reader::PhysicalFilterContext::new_opt(
&metadata,
None,
&read_format,
&col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false),
);
assert!(physical_filter.is_some());
let base = new_test_range_base(vec![]);
let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1);
let mask = base
.compute_filter_mask_flat(&batch, false, &mut TagDecodeState::new())
.unwrap()
.unwrap();
assert_eq!(mask_with_skip.count_set_bits(), 2);
assert_eq!(mask.count_set_bits(), 4);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -24,10 +24,12 @@ use std::time::{Duration, Instant};
use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{tracing, warn};
use common_telemetry::{error, tracing, warn};
use datafusion::physical_plan::PhysicalExpr;
use datafusion_expr::Expr;
use datafusion_expr::utils::expr_to_columns;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::{Field, SchemaRef};
use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
@@ -46,7 +48,6 @@ use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId};
use table::predicate::Predicate;
use self::stream::{ParquetErrorAdapter, ProjectedRecordBatchStream};
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::{CacheStrategy, CachedSstMeta};
#[cfg(feature = "vector_index")]
@@ -79,11 +80,12 @@ use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::parquet::format::need_override_sequence;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::prefilter::{
PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter,
PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter,
};
use crate::sst::parquet::read_columns::{
ParquetReadColumns, ProjectionMaskPlan, build_projection_plan,
};
use crate::sst::parquet::reader::stream::{ParquetErrorAdapter, ProjectedRecordBatchStream};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats;
@@ -450,22 +452,6 @@ impl ParquetReaderBuilder {
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
.context(ReadDataPartSnafu)?;
let filters = if let Some(predicate) = &self.predicate {
predicate
.exprs()
.iter()
.filter_map(|expr| {
SimpleFilterContext::new_opt(
&region_meta,
self.expected_metadata.as_deref(),
expr,
)
})
.collect::<Vec<_>>()
} else {
vec![]
};
let dyn_filters = if let Some(predicate) = &self.predicate {
predicate.dyn_filters().as_ref().clone()
} else {
@@ -474,20 +460,13 @@ impl ParquetReaderBuilder {
let codec = build_primary_key_codec(read_format.metadata());
// Extract primary key filters from precomputed filter contexts for prefiltering.
let primary_key_filters = {
let pk_filters = filters
.iter()
.filter_map(SimpleFilterContext::primary_key_prefilter)
.collect::<Vec<_>>();
(!pk_filters.is_empty()).then_some(Arc::new(pk_filters))
};
let prefilter_builder = PrefilterContextBuilder::new(
let filter_plan = build_reader_filter_plan(
self.predicate.as_ref(),
self.expected_metadata.as_deref(),
self.pre_filter_mode,
&read_format,
&codec,
primary_key_filters.as_ref(),
parquet_meta.file_metadata().schema_descr(),
&codec,
);
let output_schema = read_format.output_arrow_schema()?;
@@ -501,7 +480,7 @@ impl ParquetReaderBuilder {
object_store: self.object_store.clone(),
projection: projection_plan,
cache_strategy: self.cache_strategy.clone(),
prefilter_builder,
prefilter_builder: filter_plan.prefilter_builder,
};
let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
@@ -509,7 +488,7 @@ impl ParquetReaderBuilder {
let context = FileRangeContext::new(
reader_builder,
RangeBase {
filters,
filters: filter_plan.remaining_simple_filters,
dyn_filters,
read_format,
expected_metadata: self.expected_metadata.clone(),
@@ -1657,12 +1636,6 @@ pub(crate) struct RowGroupReaderBuilder {
/// Context passed to [RowGroupReaderBuilder::build()] carrying all information
/// needed for prefiltering decisions.
pub(crate) struct RowGroupBuildContext<'a> {
/// Simple filters pushed down. Used by prefilter on other columns.
#[allow(dead_code)]
pub(crate) filters: &'a [SimpleFilterContext],
/// Whether to skip field filters. Used by prefilter on other columns.
#[allow(dead_code)]
pub(crate) skip_fields: bool,
/// Index of the row group to read.
pub(crate) row_group_idx: usize,
/// Row selection for the row group. `None` means all rows.
@@ -1690,7 +1663,7 @@ impl RowGroupReaderBuilder {
&self.cache_strategy
}
pub(crate) fn has_flat_primary_key_prefilter(&self) -> bool {
pub(crate) fn has_predicate_prefilter(&self) -> bool {
self.prefilter_builder.is_some()
}
@@ -1699,6 +1672,19 @@ impl RowGroupReaderBuilder {
/// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read:
/// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection
/// 2. Reads the full projection with the refined row selection
///
/// The prefilter pass is *best-effort pruning*, not the precise filter for the query.
/// Predicates that cannot be lowered to prefilter columns (column not projected,
/// expression not supported, etc.) are silently skipped. Correctness rests on the
/// DataFusion `FilterExec` above this reader, which always re-applies the original
/// predicate. Tag and timestamp predicates that flow through [`SimpleFilterEvaluator`]
/// are an exception — the engine enforces them precisely, so the prefilter pass is the
/// only place they execute. See [`build_reader_filter_plan`] for the bucketing rules.
///
/// When the prefilter result selects no rows, the second read still issues but
/// parquet-rs short-circuits before any column-chunk IO: the row-group state machine
/// jumps to `Finished` once it sees `num_rows_selected() == 0`, so no fast path is
/// added here.
pub(crate) async fn build(
&self,
build_ctx: RowGroupBuildContext<'_>,
@@ -1792,6 +1778,7 @@ impl RowGroupReaderBuilder {
}
}
#[derive(Clone)]
/// The filter to evaluate or the prune result of the default value.
pub(crate) enum MaybeFilter {
/// The filter to evaluate.
@@ -1802,6 +1789,17 @@ pub(crate) enum MaybeFilter {
Pruned,
}
impl MaybeFilter {
/// Returns the inner filter when it is available.
pub(crate) fn as_filter(&self) -> Option<&SimpleFilterEvaluator> {
match self {
MaybeFilter::Filter(filter) => Some(filter),
MaybeFilter::Matched | MaybeFilter::Pruned => None,
}
}
}
#[derive(Clone)]
/// Context to evaluate the column filter for a parquet file.
pub(crate) struct SimpleFilterContext {
/// Filter to evaluate.
@@ -1810,8 +1808,6 @@ pub(crate) struct SimpleFilterContext {
column_id: ColumnId,
/// Semantic type of the column.
semantic_type: SemanticType,
/// Whether this filter can be applied by flat parquet primary-key prefiltering.
usable_primary_key_filter: bool,
}
impl SimpleFilterContext {
@@ -1825,10 +1821,6 @@ impl SimpleFilterContext {
expr: &Expr,
) -> Option<Self> {
let filter = SimpleFilterEvaluator::try_new(expr)?;
// Parquet PK prefilter always supports the partition column. Only
// PartitionTreeMemtable skips it after partition pruning.
let usable_primary_key_filter =
is_usable_primary_key_filter(sst_meta, expected_meta, &filter);
let (column_metadata, maybe_filter) = match expected_meta {
Some(meta) => {
// Gets the column metadata from the expected metadata.
@@ -1859,14 +1851,10 @@ impl SimpleFilterContext {
}
};
let usable_primary_key_filter =
matches!(maybe_filter, MaybeFilter::Filter(_)) && usable_primary_key_filter;
Some(Self {
filter: maybe_filter,
column_id: column_metadata.column_id,
semantic_type: column_metadata.semantic_type,
usable_primary_key_filter,
})
}
@@ -1884,22 +1872,116 @@ impl SimpleFilterContext {
pub(crate) fn semantic_type(&self) -> SemanticType {
self.semantic_type
}
}
/// Returns whether this filter is eligible for flat parquet PK prefiltering.
pub(crate) fn usable_primary_key_filter(&self) -> bool {
self.usable_primary_key_filter
}
/// Context to evaluate a physical expression for a parquet file.
#[derive(Clone)]
pub(crate) struct PhysicalFilterContext {
/// Filter to evaluate.
filter: Arc<dyn PhysicalExpr>,
/// Id of the column to evaluate.
column_id: ColumnId,
/// Name of the column to evaluate.
column_name: String,
/// Semantic type of the column.
semantic_type: SemanticType,
/// Schema containing only the referenced column.
schema: SchemaRef,
}
/// Returns the filter evaluator when it is eligible for PK prefiltering.
pub(crate) fn primary_key_prefilter(&self) -> Option<SimpleFilterEvaluator> {
if !self.usable_primary_key_filter {
impl PhysicalFilterContext {
/// Creates a context for the `expr`.
///
/// Returns None if the expression doesn't reference exactly one column or the
/// column to filter doesn't exist in the SST metadata or the expected metadata.
pub(crate) fn new_opt(
sst_meta: &RegionMetadataRef,
expected_meta: Option<&RegionMetadata>,
read_format: &FlatReadFormat,
expr: &Expr,
) -> Option<Self> {
if !Self::is_prefilter_candidate(expr) {
return None;
}
let column_name = Self::single_column_name(expr)?;
let column_metadata = match expected_meta {
Some(meta) => {
let column = meta.column_by_name(&column_name)?;
let sst_column = sst_meta.column_by_id(column.column_id)?;
// Physical expr requires the column name to match the SST column name.
if sst_column.column_schema.name != column_name {
return None;
}
column
}
None => sst_meta.column_by_name(&column_name)?,
};
match &self.filter {
MaybeFilter::Filter(filter) => Some(filter.clone()),
MaybeFilter::Matched | MaybeFilter::Pruned => None,
// The column must be present in the projected arrow schema for the
// prefilter to be able to read it.
let (_, field) = read_format.arrow_schema().column_with_name(&column_name)?;
let field = field.clone();
let schema = Arc::new(ArrowSchema::new(vec![field]));
let physical_expr = Predicate::to_physical_expr(expr, &schema)
.inspect_err(|e| {
error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}");
})
.ok()?;
Some(Self {
filter: physical_expr,
column_id: column_metadata.column_id,
column_name,
semantic_type: column_metadata.semantic_type,
schema,
})
}
/// Returns true if the expression is a variant we want to evaluate as a
/// physical prefilter. Binary exprs are intentionally excluded because
/// [`SimpleFilterEvaluator`] already handles them.
// TODO(yingwen): extend more expressions if necessary. For example, allow some cheap scalar functions (e.g. `lower`, `length`, date truncations)
fn is_prefilter_candidate(expr: &Expr) -> bool {
matches!(
expr,
Expr::InList(_) | Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Between(_)
)
}
fn single_column_name(expr: &Expr) -> Option<String> {
let mut columns = HashSet::new();
if expr_to_columns(expr, &mut columns).is_err() {
return None;
}
if columns.len() != 1 {
return None;
}
columns.iter().next().map(|column| column.name.clone())
}
/// Returns the filter to evaluate.
pub(crate) fn filter(&self) -> &Arc<dyn PhysicalExpr> {
&self.filter
}
/// Returns the column id.
pub(crate) fn column_id(&self) -> ColumnId {
self.column_id
}
/// Returns the column name.
pub(crate) fn column_name(&self) -> &str {
&self.column_name
}
/// Returns the semantic type of the column.
pub(crate) fn semantic_type(&self) -> SemanticType {
self.semantic_type
}
/// Returns the schema containing only the referenced column.
pub(crate) fn schema(&self) -> &SchemaRef {
&self.schema
}
}
@@ -1956,7 +2038,6 @@ impl ParquetReader {
row_group_idx,
Some(row_selection),
Some(&self.fetch_metrics),
skip_fields,
))
.await?;
self.reader = Some(FlatPruneReader::new_with_row_group_reader(
@@ -1987,7 +2068,6 @@ impl ParquetReader {
row_group_idx,
Some(row_selection),
Some(&fetch_metrics),
skip_fields,
))
.await?;
Some(FlatPruneReader::new_with_row_group_reader(
@@ -2227,32 +2307,78 @@ mod tests {
}
#[test]
fn test_simple_filter_context_marks_usable_primary_key_filter() {
fn test_simple_filter_context_uses_default_value_for_mismatched_expected_metadata() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let ctx =
SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap();
assert!(ctx.usable_primary_key_filter());
assert!(ctx.primary_key_prefilter().is_some());
}
#[test]
fn test_simple_filter_context_skips_non_usable_primary_key_filter() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let field_ctx =
SimpleFilterContext::new_opt(&metadata, None, &col("field_0").eq(lit(1_u64))).unwrap();
assert!(!field_ctx.usable_primary_key_filter());
assert!(field_ctx.primary_key_prefilter().is_none());
let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
let mismatched_ctx = SimpleFilterContext::new_opt(
let ctx = SimpleFilterContext::new_opt(
&metadata,
Some(expected_metadata.as_ref()),
&col("tag_0").eq(lit("a")),
)
.unwrap();
assert!(!mismatched_ctx.usable_primary_key_filter());
assert!(mismatched_ctx.primary_key_prefilter().is_none());
assert!(matches!(
ctx.filter(),
MaybeFilter::Matched | MaybeFilter::Pruned
));
}
#[test]
fn test_physical_filter_context_skips_renamed_column() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
let read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
None,
"test",
true,
)
.unwrap();
let ctx = PhysicalFilterContext::new_opt(
&metadata,
Some(expected_metadata.as_ref()),
&read_format,
&col("tag_0").in_list(vec![lit("a"), lit("b")], false),
);
assert!(ctx.is_none());
}
#[test]
fn test_physical_filter_context_only_accepts_prefilter_candidates() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
None,
"test",
true,
)
.unwrap();
// InList is on the allowlist — should build a context.
let in_list = col("tag_0").in_list(vec![lit("a"), lit("b")], false);
assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &in_list).is_some());
// NOT IN uses the same variant with `negated: true` — also accepted.
let not_in = col("tag_0").in_list(vec![lit("a"), lit("b")], true);
assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &not_in).is_some());
// IS NULL / IS NOT NULL are accepted.
let is_null = col("tag_0").is_null();
assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_null).is_some());
let is_not_null = col("tag_0").is_not_null();
assert!(
PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_not_null).is_some()
);
// BETWEEN is accepted.
let between = col("field_0").between(lit(1_u64), lit(10_u64));
assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &between).is_some());
// Binary expr is handled by SimpleFilterEvaluator — rejected here.
let binary = col("tag_0").eq(lit("a"));
assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &binary).is_none());
}
}

View File

@@ -567,26 +567,6 @@ pub(crate) fn row_selection_from_row_ranges(
RowSelection::from(selectors)
}
/// Like [`row_selection_from_row_ranges`] but guarantees the resulting selection
/// covers exactly `total_row_count` rows by appending a trailing skip if needed.
///
/// Required when the result is used as the inner operand of [`RowSelection::and_then`], because
/// `and_then` expects the inner selection to account for every row selected by the outer one.
pub(crate) fn row_selection_from_row_ranges_exact(
row_ranges: impl Iterator<Item = Range<usize>>,
total_row_count: usize,
) -> RowSelection {
let (mut selectors, last_processed_end) =
build_selectors_from_row_ranges(row_ranges, total_row_count);
if last_processed_end < total_row_count {
// Preserve the full logical length of the selection even when the final rows are all
// filtered out. Without this trailing skip, `and_then` sees an undersized inner
// selection and panics.
add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
}
RowSelection::from(selectors)
}
fn build_selectors_from_row_ranges(
row_ranges: impl Iterator<Item = Range<usize>>,
total_row_count: usize,
@@ -739,56 +719,6 @@ mod tests {
assert_eq!(selection, expected);
}
#[test]
fn test_exact_single_range_with_trailing_skip() {
let selection = row_selection_from_row_ranges_exact(Some(0..3).into_iter(), 6);
let expected = RowSelection::from(vec![RowSelector::select(3), RowSelector::skip(3)]);
assert_eq!(selection, expected);
assert_eq!(selection.row_count(), 3);
}
#[test]
fn test_exact_non_contiguous_ranges() {
let ranges = [1..3, 5..8];
let selection = row_selection_from_row_ranges_exact(ranges.iter().cloned(), 10);
let expected = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(3),
RowSelector::skip(2),
]);
assert_eq!(selection, expected);
assert_eq!(selection.row_count(), 5);
}
#[test]
fn test_exact_empty_ranges() {
let selection = row_selection_from_row_ranges_exact([].iter().cloned(), 10);
let expected = RowSelection::from(vec![RowSelector::skip(10)]);
assert_eq!(selection, expected);
assert_eq!(selection.row_count(), 0);
}
#[test]
fn test_exact_range_covers_all_rows() {
let selection = row_selection_from_row_ranges_exact(Some(0..10).into_iter(), 10);
let expected = RowSelection::from(vec![RowSelector::select(10)]);
assert_eq!(selection, expected);
assert_eq!(selection.row_count(), 10);
}
#[test]
fn test_exact_compatible_with_and_then() {
// Outer selects rows 0..6 out of 10.
let outer = RowSelection::from(vec![RowSelector::select(6), RowSelector::skip(4)]);
// Inner: within those 6 rows, select only rows 0..3.
let inner = row_selection_from_row_ranges_exact(Some(0..3).into_iter(), 6);
let result = outer.and_then(&inner);
let expected = RowSelection::from(vec![RowSelector::select(3), RowSelector::skip(7)]);
assert_eq!(result, expected);
}
#[test]
fn test_row_ids_to_selection() {
let row_ids = [1, 3, 5, 7, 9].into_iter();

View File

@@ -120,11 +120,11 @@ impl Predicate {
.context(error::DatafusionSnafu)
}
/// Builds physical exprs according to provided schema.
pub fn to_physical_exprs(
&self,
/// Builds a single physical expr according to provided schema.
pub fn to_physical_expr(
expr: &Expr,
schema: &arrow::datatypes::SchemaRef,
) -> error::Result<Vec<Arc<dyn PhysicalExpr>>> {
) -> error::Result<Arc<dyn PhysicalExpr>> {
let df_schema = schema
.clone()
.to_dfschema_ref()
@@ -135,12 +135,21 @@ impl Predicate {
// registering variables.
let execution_props = &ExecutionProps::new();
create_physical_expr(expr, df_schema.as_ref(), execution_props)
.context(error::DatafusionSnafu)
}
/// Builds physical exprs according to provided schema.
pub fn to_physical_exprs(
&self,
schema: &arrow::datatypes::SchemaRef,
) -> error::Result<Vec<Arc<dyn PhysicalExpr>>> {
let dyn_filters = self.dyn_filter_phy_exprs()?;
Ok(self
.exprs
.iter()
.filter_map(|expr| create_physical_expr(expr, df_schema.as_ref(), execution_props).ok())
.filter_map(|expr| Self::to_physical_expr(expr, schema).ok())
.chain(dyn_filters)
.collect::<Vec<_>>())
}
@@ -730,5 +739,8 @@ mod tests {
let predicates = predicate.to_physical_exprs(&schema).unwrap();
assert!(!predicates.is_empty());
let physical_expr = Predicate::to_physical_expr(&col("host").eq(lit("host_a")), &schema);
assert!(physical_expr.is_ok());
}
}