From dcf466692c7cca75226deeb1041adb2b08055a9f Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 15 Apr 2026 21:00:20 +0800 Subject: [PATCH] feat: merge scan register dyn filter(not send yet) Signed-off-by: discord9 --- src/query/src/datafusion.rs | 1 + src/query/src/dist_plan/merge_scan.rs | 203 +++++++++++++++++++++++++- 2 files changed, 202 insertions(+), 2 deletions(-) diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index a173f2aee8..a8d60d42f4 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -475,6 +475,7 @@ impl QueryEngine for DatafusionQueryEngine { fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext { let mut state = self.state.session_state(); state.config_mut().set_extension(query_ctx.clone()); + state.config_mut().set_extension(self.state.clone()); // note that hints in "x-greptime-hints" is automatically parsed // and set to query context's extension, so we can get it from query context. if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) { diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 470b4d325f..98a9a7831c 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -26,6 +26,9 @@ use common_recordbatch::adapter::RecordBatchMetrics; use common_telemetry::tracing_context::TracingContext; use datafusion::execution::{SessionState, TaskContext}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::filter_pushdown::{ + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, +}; use datafusion::physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time, }; @@ -36,8 +39,10 @@ use datafusion::physical_plan::{ }; use datafusion_common::{Column as ColumnExpr, DataFusionError, Result}; use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; -use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr}; +use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr}; +use datafusion_physical_expr::{ + Distribution, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, +}; use futures_util::StreamExt; use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; @@ -48,9 +53,12 @@ use table::table_name::TableName; use tokio::time::Instant; use tracing::{Instrument, Span}; +use super::filter_id::build_remote_dyn_filter_id; use crate::dist_plan::analyzer::AliasMapping; use crate::dist_plan::analyzer::utils::patch_batch_timezone; +use crate::dist_plan::{QueryDynFilterRegistry, Subscriber}; use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS}; +use crate::query_engine::QueryEngineState; use crate::region_query::RegionQueryHandlerRef; #[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)] @@ -132,6 +140,91 @@ impl MergeScanLogicalPlan { } } +#[derive(Debug, Clone)] +struct CapturedRemoteDynFilter { + producer_local_ordinal: usize, + alive_dyn_filter: Arc, +} + +fn capture_remote_dyn_filters( + parent_filters: Vec>, +) -> Vec { + parent_filters + .into_iter() + .enumerate() + .filter_map(|(producer_local_ordinal, filter)| { + downcast_dynamic_filter(filter).map(|alive_dyn_filter| CapturedRemoteDynFilter { + producer_local_ordinal, + alive_dyn_filter, + }) + }) + .collect() +} + +fn downcast_dynamic_filter( + expr: Arc, +) -> Option> { + (expr as Arc) + .downcast::() + .ok() +} + +fn query_engine_state_from_task_context(context: &TaskContext) -> Option> { + let query_engine_state: Option> = + context.session_config().get_extension(); + query_engine_state +} + +fn register_remote_dyn_filters_for_region( + registry: &QueryDynFilterRegistry, + region_id: RegionId, + captured_remote_dyn_filters: &[CapturedRemoteDynFilter], +) { + for captured_dyn_filter in captured_remote_dyn_filters { + let children = captured_dyn_filter + .alive_dyn_filter + .children() + .into_iter() + .cloned() + .collect::>(); + let Ok(filter_id) = build_remote_dyn_filter_id( + region_id, + captured_dyn_filter.producer_local_ordinal, + &children, + ) else { + continue; + }; + + let _ = registry.register_remote_dyn_filter( + filter_id.clone(), + captured_dyn_filter.alive_dyn_filter.clone(), + ); + let _ = registry.register_subscriber(&filter_id, Subscriber::new(region_id)); + } +} + +fn bridge_remote_dyn_filters_for_region( + context: &TaskContext, + query_ctx: &QueryContextRef, + region_id: RegionId, + captured_remote_dyn_filters: &[CapturedRemoteDynFilter], +) { + if captured_remote_dyn_filters.is_empty() { + return; + } + + let Some(query_engine_state) = query_engine_state_from_task_context(context) else { + return; + }; + let Some(registry) = query_engine_state.get_or_init_remote_dyn_filter_registry(query_ctx) + else { + return; + }; + + register_remote_dyn_filters_for_region(®istry, region_id, captured_remote_dyn_filters); +} + +#[derive(Clone)] pub struct MergeScanExec { table: TableName, regions: Vec, @@ -145,6 +238,7 @@ pub struct MergeScanExec { /// Metrics for each partition partition_metrics: Arc>>, query_ctx: QueryContextRef, + captured_remote_dyn_filters: Arc>>, target_partition: usize, partition_cols: AliasMapping, } @@ -243,6 +337,7 @@ impl MergeScanExec { partition_metrics: Arc::default(), properties, query_ctx, + captured_remote_dyn_filters: Arc::default(), target_partition, partition_cols, }) @@ -263,6 +358,7 @@ impl MergeScanExec { let partition_metrics_moved = self.partition_metrics.clone(); let plan = self.plan.clone(); let target_partition = self.target_partition; + let captured_remote_dyn_filters = self.captured_remote_dyn_filters(); let dbname = context.task_id().unwrap_or_default(); let tracing_context = TracingContext::from_json(context.session_id().as_str()); let current_channel = self.query_ctx.channel(); @@ -285,6 +381,13 @@ impl MergeScanExec { .step_by(target_partition) .copied() { + bridge_remote_dyn_filters_for_region( + context.as_ref(), + &query_ctx, + region_id, + &captured_remote_dyn_filters, + ); + let region_span = tracing_context.attach(tracing::info_span!( parent: &Span::current(), "merge_scan_region", @@ -463,11 +566,16 @@ impl MergeScanExec { sub_stage_metrics: self.sub_stage_metrics.clone(), partition_metrics: self.partition_metrics.clone(), query_ctx: self.query_ctx.clone(), + captured_remote_dyn_filters: self.captured_remote_dyn_filters.clone(), target_partition: self.target_partition, partition_cols: self.partition_cols.clone(), }) } + fn captured_remote_dyn_filters(&self) -> Vec { + self.captured_remote_dyn_filters.lock().unwrap().clone() + } + pub fn sub_stage_metrics(&self) -> Vec { self.sub_stage_metrics .lock() @@ -606,6 +714,40 @@ impl ExecutionPlan for MergeScanExec { Ok(self.clone()) } + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &datafusion::config::ConfigOptions, + ) -> Result>> { + let parent_filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|filter| filter.filter) + .collect::>(); + let supported = parent_filters + .iter() + .map(|filter| filter.as_any().is::()) + .collect::>(); + *self.captured_remote_dyn_filters.lock().unwrap() = + capture_remote_dyn_filters(parent_filters); + let new_self = Arc::new(self.clone()); + + Ok(FilterPushdownPropagation { + filters: supported + .into_iter() + .map(|supported| { + if supported { + PushedDown::Yes + } else { + PushedDown::No + } + }) + .collect(), + updated_node: Some(new_self), + }) + } + fn execute( &self, partition: usize, @@ -713,3 +855,60 @@ impl MergeScanMetric { self.greptime_exec_cost.add(metrics); } } + +#[cfg(test)] +mod tests { + use datafusion_physical_expr::expressions::lit; + use session::query_id::QueryId; + use uuid::Uuid; + + use super::*; + + fn test_query_id(value: u128) -> QueryId { + QueryId::from(Uuid::from_u128(value)) + } + + #[test] + fn capture_remote_dyn_filters_preserves_parent_filter_ordinals() { + let parent_filters = vec![ + Arc::new(Column::new("service", 0)) as Arc, + Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("host", 1)) as Arc<_>], + lit(true) as _, + )) as Arc, + Arc::new(Column::new("zone", 2)) as Arc, + Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("pod", 3)) as Arc<_>], + lit(true) as _, + )) as Arc, + ]; + + let captured = capture_remote_dyn_filters(parent_filters); + + assert_eq!(captured.len(), 2); + assert_eq!(captured[0].producer_local_ordinal, 1); + assert_eq!(captured[1].producer_local_ordinal, 3); + } + + #[test] + fn register_remote_dyn_filters_for_region_reuses_existing_entry() { + let registry = QueryDynFilterRegistry::new(test_query_id(1)); + let captured_remote_dyn_filters = vec![CapturedRemoteDynFilter { + producer_local_ordinal: 2, + alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("host", 0)) as Arc<_>], + lit(true) as _, + )), + }]; + let region_id = RegionId::new(1024, 7); + + register_remote_dyn_filters_for_region(®istry, region_id, &captured_remote_dyn_filters); + register_remote_dyn_filters_for_region(®istry, region_id, &captured_remote_dyn_filters); + + assert_eq!(registry.entry_count(), 1); + let entry = registry.entries().pop().unwrap(); + assert_eq!(entry.filter_id().producer_ordinal(), 2); + assert_eq!(entry.subscribers().len(), 1); + assert_eq!(entry.subscribers()[0].region_id(), region_id); + } +}