feat: file range dynamic filter (#7441)

* feat: add dynamic filtering support in file range and predicate handling

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-12-23 14:15:30 +08:00
committed by GitHub
parent 342eb47e19
commit b3bc3c76f1
5 changed files with 139 additions and 27 deletions

View File

@@ -74,7 +74,7 @@ impl BulkIterContext {
.collect();
let read_format = ReadFormat::new(
region_metadata,
region_metadata.clone(),
projection,
true,
None,
@@ -82,10 +82,18 @@ impl BulkIterContext {
skip_auto_convert,
)?;
let dyn_filters = predicate
.as_ref()
.map(|pred| pred.dyn_filters().clone())
.unwrap_or_default();
Ok(Self {
base: RangeBase {
filters: simple_filters,
dyn_filters,
read_format,
prune_schema: region_metadata.schema.clone(),
expected_metadata: Some(region_metadata),
codec,
// we don't need to compat batch since all batch in memtable have the same schema.
compat_batch: None,

View File

@@ -1181,7 +1181,9 @@ pub fn build_file_range_scan_stream(
};
for range in ranges {
let build_reader_start = Instant::now();
let reader = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await?;
let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
continue;
};
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch();
@@ -1239,7 +1241,7 @@ pub fn build_flat_file_range_scan_stream(
};
for range in ranges {
let build_reader_start = Instant::now();
let mut reader = range.flat_reader(fetch_metrics.as_deref()).await?;
let Some(mut reader) = range.flat_reader(fetch_metrics.as_deref()).await? else{continue};
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);

View File

@@ -21,15 +21,19 @@ use std::sync::Arc;
use api::v1::{OpType, SemanticType};
use common_telemetry::error;
use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
use datatypes::arrow::array::{ArrayRef, BooleanArray};
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::Schema;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use parquet::arrow::arrow_reader::RowSelection;
use parquet::file::metadata::ParquetMetaData;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, TimeSeriesRowSelector};
use table::predicate::Predicate;
use crate::error::{
ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
@@ -46,6 +50,7 @@ use crate::sst::parquet::reader::{
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
use crate::sst::parquet::stats::RowGroupPruningStats;
/// Checks if a row group contains delete operations by examining the min value of op_type column.
///
@@ -114,12 +119,62 @@ impl FileRange {
row_selection.row_count() == rows_in_group as usize
}
/// Performs pruning before reading the [FileRange].
/// It use latest dynamic filters with row group statistics to prune the range.
///
/// Returns false if the entire range is pruned and can be skipped.
fn in_dynamic_filter_range(&self) -> bool {
if self.context.base.dyn_filters.is_empty() {
return true;
}
let curr_row_group = self
.context
.reader_builder
.parquet_metadata()
.row_group(self.row_group_idx);
let read_format = self.context.read_format();
let prune_schema = &self.context.base.prune_schema;
let stats = RowGroupPruningStats::new(
std::slice::from_ref(curr_row_group),
read_format,
self.context.base.expected_metadata.clone(),
self.compute_skip_fields(),
);
// not costly to create a predicate here since dynamic filters are wrapped in Arc
let pred = Predicate::new(vec![]).with_dyn_filters(self.context.base.dyn_filters.clone());
pred.prune_with_stats(&stats, prune_schema.arrow_schema())
.first()
.cloned()
.unwrap_or(true) // unexpected, not skip just in case
}
fn compute_skip_fields(&self) -> bool {
match self.context.base.pre_filter_mode {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => {
// Check if this specific row group contains delete op
row_group_contains_delete(
self.context.reader_builder.parquet_metadata(),
self.row_group_idx,
self.context.reader_builder.file_path(),
)
.unwrap_or(true)
}
}
}
/// Returns a reader to read the [FileRange].
pub(crate) async fn reader(
&self,
selector: Option<TimeSeriesRowSelector>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<PruneReader> {
) -> Result<Option<PruneReader>> {
if !self.in_dynamic_filter_range() {
return Ok(None);
}
let parquet_reader = self
.context
.reader_builder
@@ -170,14 +225,17 @@ impl FileRange {
)
};
Ok(prune_reader)
Ok(Some(prune_reader))
}
/// Creates a flat reader that returns RecordBatch.
pub(crate) async fn flat_reader(
&self,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<FlatPruneReader> {
) -> Result<Option<FlatPruneReader>> {
if !self.in_dynamic_filter_range() {
return Ok(None);
}
let parquet_reader = self
.context
.reader_builder
@@ -198,7 +256,7 @@ impl FileRange {
skip_fields,
);
Ok(flat_prune_reader)
Ok(Some(flat_prune_reader))
}
/// Returns the helper to compat batches.
@@ -224,22 +282,10 @@ pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
impl FileRangeContext {
/// Creates a new [FileRangeContext].
pub(crate) fn new(
reader_builder: RowGroupReaderBuilder,
filters: Vec<SimpleFilterContext>,
read_format: ReadFormat,
codec: Arc<dyn PrimaryKeyCodec>,
pre_filter_mode: PreFilterMode,
) -> Self {
pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
Self {
reader_builder,
base: RangeBase {
filters,
read_format,
codec,
compat_batch: None,
pre_filter_mode,
},
base,
}
}
@@ -323,8 +369,13 @@ pub enum PreFilterMode {
pub(crate) struct RangeBase {
/// Filters pushed down.
pub(crate) filters: Vec<SimpleFilterContext>,
/// Dynamic filter physical exprs.
pub(crate) dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>,
/// Helper to read the SST.
pub(crate) read_format: ReadFormat,
pub(crate) expected_metadata: Option<RegionMetadataRef>,
/// Schema used for pruning with dynamic filters.
pub(crate) prune_schema: Arc<Schema>,
/// Decoder for primary keys
pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
/// Optional helper to compat batches.

View File

@@ -62,7 +62,7 @@ use crate::sst::index::inverted_index::applier::{
InvertedIndexApplierRef, InvertedIndexApplyMetrics,
};
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
FileRangeContext, FileRangeContextRef, PreFilterMode, RangeBase, row_group_contains_delete,
};
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
use crate::sst::parquet::metadata::MetadataLoader;
@@ -342,6 +342,12 @@ impl ParquetReaderBuilder {
);
}
let prune_schema = self
.expected_metadata
.as_ref()
.map(|meta| meta.schema.clone())
.unwrap_or_else(|| region_meta.schema.clone());
let reader_builder = RowGroupReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
@@ -368,14 +374,26 @@ impl ParquetReaderBuilder {
vec![]
};
let dyn_filters = if let Some(predicate) = &self.predicate {
predicate.dyn_filters().clone()
} else {
Arc::new(vec![])
};
let codec = build_primary_key_codec(read_format.metadata());
let context = FileRangeContext::new(
reader_builder,
filters,
read_format,
codec,
self.pre_filter_mode,
RangeBase {
filters,
dyn_filters,
read_format,
expected_metadata: self.expected_metadata.clone(),
prune_schema,
codec,
compat_batch: None,
pre_filter_mode: self.pre_filter_mode,
},
);
metrics.build_cost += start.elapsed();

View File

@@ -25,6 +25,7 @@ use datafusion_common::pruning::PruningStatistics;
use datafusion_expr::expr::{Expr, InList};
use datafusion_expr::{Between, BinaryExpr, Operator};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
use datafusion_physical_expr::{PhysicalExpr, create_physical_expr};
use datatypes::arrow;
use datatypes::value::scalar_value_to_timestamp;
@@ -51,11 +52,15 @@ macro_rules! return_none_if_utf8 {
};
}
/// Reference-counted pointer to a list of logical exprs.
/// Reference-counted pointer to a list of logical exprs and a list of dynamic filter physical exprs.
#[derive(Debug, Clone)]
pub struct Predicate {
/// logical exprs
exprs: Arc<Vec<Expr>>,
/// dynamic filter physical exprs, only useful if dynamic filtering is enabled
///
/// They are usually from `TopK` or `Join` operators, and can dynamically filter data during query execution by using current runtime information to further reduce data scanning
dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>,
}
impl Predicate {
@@ -65,6 +70,15 @@ impl Predicate {
pub fn new(exprs: Vec<Expr>) -> Self {
Self {
exprs: Arc::new(exprs),
dyn_filters: Arc::new(vec![]),
}
}
/// Sets the dynamic filter physical exprs.
pub fn with_dyn_filters(self, dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>) -> Self {
Self {
exprs: self.exprs,
dyn_filters,
}
}
@@ -73,6 +87,22 @@ impl Predicate {
&self.exprs
}
/// Returns the dynamic filter physical exprs. Notice this return a live dynamic filters which
/// can change during query execution.
pub fn dyn_filters(&self) -> &Arc<Vec<DynamicFilterPhysicalExpr>> {
&self.dyn_filters
}
/// Returns the dynamic filter as physical exprs. Notice this return a "snapshot" of
/// dynamic filters at the time of calling this method.
pub fn dyn_filter_phy_exprs(&self) -> error::Result<Vec<Arc<dyn PhysicalExpr>>> {
self.dyn_filters
.iter()
.map(|e| e.current())
.collect::<Result<Vec<_>, _>>()
.context(error::DatafusionSnafu)
}
/// Builds physical exprs according to provided schema.
pub fn to_physical_exprs(
&self,
@@ -88,10 +118,13 @@ impl Predicate {
// registering variables.
let execution_props = &ExecutionProps::new();
let dyn_filters = self.dyn_filter_phy_exprs()?;
Ok(self
.exprs
.iter()
.filter_map(|expr| create_physical_expr(expr, df_schema.as_ref(), execution_props).ok())
.chain(dyn_filters)
.collect::<Vec<_>>())
}