From 22d9eb69301e9ff4b1a9ded4685a47541b125a11 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 28 Oct 2025 10:44:29 +0800 Subject: [PATCH] feat: part sort provide dyn filter (#7140) * feat: part sort provide dyn filter Signed-off-by: discord9 * fix: reset_state reset dynamic filter Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/query/src/part_sort.rs | 79 +++++++++++++++++++++++++++++++++++-- src/table/src/table/scan.rs | 13 ++++++ 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 64ba76a149..ebf4fddc1e 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -30,14 +30,18 @@ use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; use datafusion::common::arrow::compute::sort_to_indices; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::{RecordBatchStream, TaskContext}; +use datafusion::physical_plan::execution_plan::CardinalityEffect; +use datafusion::physical_plan::filter_pushdown::{ + ChildFilterDescription, FilterDescription, FilterPushdownPhase, +}; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, TopK, TopKDynamicFilters, }; use datafusion_common::{DataFusionError, internal_err}; -use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use futures::{Stream, StreamExt}; use itertools::Itertools; use parking_lot::RwLock; @@ -61,6 +65,10 @@ pub struct PartSortExec { metrics: ExecutionPlanMetricsSet, partition_ranges: Vec>, properties: PlanProperties, + /// Filter matching the state of the sort for dynamic filter pushdown. + /// If `limit` is `Some`, this will also be set and a TopK operator may be used. + /// If `limit` is `None`, this will be `None`. + filter: Option>>, } impl PartSortExec { @@ -79,6 +87,10 @@ impl PartSortExec { properties.boundedness, ); + let filter = limit + .is_some() + .then(|| Self::create_filter(expression.expr.clone())); + Self { expression, limit, @@ -86,9 +98,17 @@ impl PartSortExec { metrics, partition_ranges, properties, + filter, } } + /// Add or reset `self.filter` to a new `TopKDynamicFilters`. + fn create_filter(expr: Arc) -> Arc> { + Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( + DynamicFilterPhysicalExpr::new(vec![expr], lit(true)), + )))) + } + pub fn to_stream( &self, context: Arc, @@ -113,6 +133,7 @@ impl PartSortExec { input_stream, self.partition_ranges[partition].clone(), partition, + self.filter.clone(), )?) as _; Ok(df_stream) @@ -192,6 +213,51 @@ impl ExecutionPlan for PartSortExec { fn benefits_from_input_partitioning(&self) -> Vec { vec![false] } + + fn cardinality_effect(&self) -> CardinalityEffect { + if self.limit.is_none() { + CardinalityEffect::Equal + } else { + CardinalityEffect::LowerEqual + } + } + + fn gather_filters_for_pushdown( + &self, + phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &datafusion::config::ConfigOptions, + ) -> datafusion_common::Result { + if !matches!(phase, FilterPushdownPhase::Post) { + return FilterDescription::from_children(parent_filters, &self.children()); + } + + let mut child = ChildFilterDescription::from_child(&parent_filters, &self.input)?; + + if let Some(filter) = &self.filter { + child = child.with_self_filter(filter.read().expr()); + } + + Ok(FilterDescription::new().with_child(child)) + } + + fn reset_state(self: Arc) -> datafusion_common::Result> { + // shared dynamic filter needs to be reset + let new_filter = self + .limit + .is_some() + .then(|| Self::create_filter(self.expression.expr.clone())); + + Ok(Arc::new(Self { + expression: self.expression.clone(), + limit: self.limit, + input: self.input.clone(), + metrics: self.metrics.clone(), + partition_ranges: self.partition_ranges.clone(), + properties: self.properties.clone(), + filter: new_filter, + })) + } } enum PartSortBuffer { @@ -240,11 +306,16 @@ impl PartSortStream { input: DfSendableRecordBatchStream, partition_ranges: Vec, partition: usize, + filter: Option>>, ) -> datafusion_common::Result { let buffer = if let Some(limit) = limit { - let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(vec![], lit(true)), - )))); + let Some(filter) = filter else { + return internal_err!( + "TopKDynamicFilters must be provided when limit is set at {}", + snafu::location!() + ); + }; + PartSortBuffer::Top( TopK::try_new( partition, diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index a60215618b..1dc3982ed2 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -26,6 +26,9 @@ use common_telemetry::warn; use datafusion::error::Result as DfResult; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::filter_pushdown::{ + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, +}; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, @@ -361,6 +364,16 @@ impl ExecutionPlan for RegionScanExec { fn name(&self) -> &str { "RegionScanExec" } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &datafusion::config::ConfigOptions, + ) -> DfResult>> { + // TODO(discord9): use the pushdown result to update the scanner's predicate + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) + } } impl DisplayAs for RegionScanExec {