feat: part sort provide dyn filter (#7140)

* feat: part sort provide dyn filter

Signed-off-by: discord9 <discord9@163.com>

* fix: reset_state reset dynamic filter

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-10-28 10:44:29 +08:00
committed by GitHub
parent da976e534d
commit 22d9eb6930
2 changed files with 88 additions and 4 deletions

View File

@@ -30,14 +30,18 @@ use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
use datafusion::common::arrow::compute::sort_to_indices;
use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion::execution::{RecordBatchStream, TaskContext};
use datafusion::physical_plan::execution_plan::CardinalityEffect;
use datafusion::physical_plan::filter_pushdown::{
ChildFilterDescription, FilterDescription, FilterPushdownPhase,
};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, TopK,
TopKDynamicFilters,
};
use datafusion_common::{DataFusionError, internal_err};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Stream, StreamExt};
use itertools::Itertools;
use parking_lot::RwLock;
@@ -61,6 +65,10 @@ pub struct PartSortExec {
metrics: ExecutionPlanMetricsSet,
partition_ranges: Vec<Vec<PartitionRange>>,
properties: PlanProperties,
/// Filter matching the state of the sort for dynamic filter pushdown.
/// If `limit` is `Some`, this will also be set and a TopK operator may be used.
/// If `limit` is `None`, this will be `None`.
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
}
impl PartSortExec {
@@ -79,6 +87,10 @@ impl PartSortExec {
properties.boundedness,
);
let filter = limit
.is_some()
.then(|| Self::create_filter(expression.expr.clone()));
Self {
expression,
limit,
@@ -86,9 +98,17 @@ impl PartSortExec {
metrics,
partition_ranges,
properties,
filter,
}
}
/// Add or reset `self.filter` to a new `TopKDynamicFilters`.
fn create_filter(expr: Arc<dyn PhysicalExpr>) -> Arc<RwLock<TopKDynamicFilters>> {
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![expr], lit(true)),
))))
}
pub fn to_stream(
&self,
context: Arc<TaskContext>,
@@ -113,6 +133,7 @@ impl PartSortExec {
input_stream,
self.partition_ranges[partition].clone(),
partition,
self.filter.clone(),
)?) as _;
Ok(df_stream)
@@ -192,6 +213,51 @@ impl ExecutionPlan for PartSortExec {
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
fn cardinality_effect(&self) -> CardinalityEffect {
if self.limit.is_none() {
CardinalityEffect::Equal
} else {
CardinalityEffect::LowerEqual
}
}
fn gather_filters_for_pushdown(
&self,
phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &datafusion::config::ConfigOptions,
) -> datafusion_common::Result<FilterDescription> {
if !matches!(phase, FilterPushdownPhase::Post) {
return FilterDescription::from_children(parent_filters, &self.children());
}
let mut child = ChildFilterDescription::from_child(&parent_filters, &self.input)?;
if let Some(filter) = &self.filter {
child = child.with_self_filter(filter.read().expr());
}
Ok(FilterDescription::new().with_child(child))
}
fn reset_state(self: Arc<Self>) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
// shared dynamic filter needs to be reset
let new_filter = self
.limit
.is_some()
.then(|| Self::create_filter(self.expression.expr.clone()));
Ok(Arc::new(Self {
expression: self.expression.clone(),
limit: self.limit,
input: self.input.clone(),
metrics: self.metrics.clone(),
partition_ranges: self.partition_ranges.clone(),
properties: self.properties.clone(),
filter: new_filter,
}))
}
}
enum PartSortBuffer {
@@ -240,11 +306,16 @@ impl PartSortStream {
input: DfSendableRecordBatchStream,
partition_ranges: Vec<PartitionRange>,
partition: usize,
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
) -> datafusion_common::Result<Self> {
let buffer = if let Some(limit) = limit {
let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
))));
let Some(filter) = filter else {
return internal_err!(
"TopKDynamicFilters must be provided when limit is set at {}",
snafu::location!()
);
};
PartSortBuffer::Top(
TopK::try_new(
partition,

View File

@@ -26,6 +26,9 @@ use common_telemetry::warn;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
};
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
@@ -361,6 +364,16 @@ impl ExecutionPlan for RegionScanExec {
fn name(&self) -> &str {
"RegionScanExec"
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &datafusion::config::ConfigOptions,
) -> DfResult<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
// TODO(discord9): use the pushdown result to update the scanner's predicate
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}
}
impl DisplayAs for RegionScanExec {