feat: merge scan register dyn filter(not send yet)

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-15 21:00:20 +08:00
parent dd1c100ba4
commit dcf466692c
2 changed files with 202 additions and 2 deletions

View File

@@ -475,6 +475,7 @@ impl QueryEngine for DatafusionQueryEngine {
fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
let mut state = self.state.session_state();
state.config_mut().set_extension(query_ctx.clone());
state.config_mut().set_extension(self.state.clone());
// note that hints in "x-greptime-hints" is automatically parsed
// and set to query context's extension, so we can get it from query context.
if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) {

View File

@@ -26,6 +26,9 @@ use common_recordbatch::adapter::RecordBatchMetrics;
use common_telemetry::tracing_context::TracingContext;
use datafusion::execution::{SessionState, TaskContext};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
};
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
};
@@ -36,8 +39,10 @@ use datafusion::physical_plan::{
};
use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr};
use datafusion_physical_expr::{
Distribution, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
use meter_core::data::ReadItem;
@@ -48,9 +53,12 @@ use table::table_name::TableName;
use tokio::time::Instant;
use tracing::{Instrument, Span};
use super::filter_id::build_remote_dyn_filter_id;
use crate::dist_plan::analyzer::AliasMapping;
use crate::dist_plan::analyzer::utils::patch_batch_timezone;
use crate::dist_plan::{QueryDynFilterRegistry, Subscriber};
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
use crate::query_engine::QueryEngineState;
use crate::region_query::RegionQueryHandlerRef;
#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
@@ -132,6 +140,91 @@ impl MergeScanLogicalPlan {
}
}
#[derive(Debug, Clone)]
struct CapturedRemoteDynFilter {
producer_local_ordinal: usize,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
}
fn capture_remote_dyn_filters(
parent_filters: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
) -> Vec<CapturedRemoteDynFilter> {
parent_filters
.into_iter()
.enumerate()
.filter_map(|(producer_local_ordinal, filter)| {
downcast_dynamic_filter(filter).map(|alive_dyn_filter| CapturedRemoteDynFilter {
producer_local_ordinal,
alive_dyn_filter,
})
})
.collect()
}
fn downcast_dynamic_filter(
expr: Arc<dyn datafusion::physical_plan::PhysicalExpr>,
) -> Option<Arc<DynamicFilterPhysicalExpr>> {
(expr as Arc<dyn Any + Send + Sync + 'static>)
.downcast::<DynamicFilterPhysicalExpr>()
.ok()
}
fn query_engine_state_from_task_context(context: &TaskContext) -> Option<Arc<QueryEngineState>> {
let query_engine_state: Option<Arc<QueryEngineState>> =
context.session_config().get_extension();
query_engine_state
}
fn register_remote_dyn_filters_for_region(
registry: &QueryDynFilterRegistry,
region_id: RegionId,
captured_remote_dyn_filters: &[CapturedRemoteDynFilter],
) {
for captured_dyn_filter in captured_remote_dyn_filters {
let children = captured_dyn_filter
.alive_dyn_filter
.children()
.into_iter()
.cloned()
.collect::<Vec<_>>();
let Ok(filter_id) = build_remote_dyn_filter_id(
region_id,
captured_dyn_filter.producer_local_ordinal,
&children,
) else {
continue;
};
let _ = registry.register_remote_dyn_filter(
filter_id.clone(),
captured_dyn_filter.alive_dyn_filter.clone(),
);
let _ = registry.register_subscriber(&filter_id, Subscriber::new(region_id));
}
}
fn bridge_remote_dyn_filters_for_region(
context: &TaskContext,
query_ctx: &QueryContextRef,
region_id: RegionId,
captured_remote_dyn_filters: &[CapturedRemoteDynFilter],
) {
if captured_remote_dyn_filters.is_empty() {
return;
}
let Some(query_engine_state) = query_engine_state_from_task_context(context) else {
return;
};
let Some(registry) = query_engine_state.get_or_init_remote_dyn_filter_registry(query_ctx)
else {
return;
};
register_remote_dyn_filters_for_region(&registry, region_id, captured_remote_dyn_filters);
}
#[derive(Clone)]
pub struct MergeScanExec {
table: TableName,
regions: Vec<RegionId>,
@@ -145,6 +238,7 @@ pub struct MergeScanExec {
/// Metrics for each partition
partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
query_ctx: QueryContextRef,
captured_remote_dyn_filters: Arc<Mutex<Vec<CapturedRemoteDynFilter>>>,
target_partition: usize,
partition_cols: AliasMapping,
}
@@ -243,6 +337,7 @@ impl MergeScanExec {
partition_metrics: Arc::default(),
properties,
query_ctx,
captured_remote_dyn_filters: Arc::default(),
target_partition,
partition_cols,
})
@@ -263,6 +358,7 @@ impl MergeScanExec {
let partition_metrics_moved = self.partition_metrics.clone();
let plan = self.plan.clone();
let target_partition = self.target_partition;
let captured_remote_dyn_filters = self.captured_remote_dyn_filters();
let dbname = context.task_id().unwrap_or_default();
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let current_channel = self.query_ctx.channel();
@@ -285,6 +381,13 @@ impl MergeScanExec {
.step_by(target_partition)
.copied()
{
bridge_remote_dyn_filters_for_region(
context.as_ref(),
&query_ctx,
region_id,
&captured_remote_dyn_filters,
);
let region_span = tracing_context.attach(tracing::info_span!(
parent: &Span::current(),
"merge_scan_region",
@@ -463,11 +566,16 @@ impl MergeScanExec {
sub_stage_metrics: self.sub_stage_metrics.clone(),
partition_metrics: self.partition_metrics.clone(),
query_ctx: self.query_ctx.clone(),
captured_remote_dyn_filters: self.captured_remote_dyn_filters.clone(),
target_partition: self.target_partition,
partition_cols: self.partition_cols.clone(),
})
}
fn captured_remote_dyn_filters(&self) -> Vec<CapturedRemoteDynFilter> {
self.captured_remote_dyn_filters.lock().unwrap().clone()
}
pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
self.sub_stage_metrics
.lock()
@@ -606,6 +714,40 @@ impl ExecutionPlan for MergeScanExec {
Ok(self.clone())
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &datafusion::config::ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
let parent_filters = child_pushdown_result
.parent_filters
.into_iter()
.map(|filter| filter.filter)
.collect::<Vec<_>>();
let supported = parent_filters
.iter()
.map(|filter| filter.as_any().is::<DynamicFilterPhysicalExpr>())
.collect::<Vec<_>>();
*self.captured_remote_dyn_filters.lock().unwrap() =
capture_remote_dyn_filters(parent_filters);
let new_self = Arc::new(self.clone());
Ok(FilterPushdownPropagation {
filters: supported
.into_iter()
.map(|supported| {
if supported {
PushedDown::Yes
} else {
PushedDown::No
}
})
.collect(),
updated_node: Some(new_self),
})
}
fn execute(
&self,
partition: usize,
@@ -713,3 +855,60 @@ impl MergeScanMetric {
self.greptime_exec_cost.add(metrics);
}
}
#[cfg(test)]
mod tests {
use datafusion_physical_expr::expressions::lit;
use session::query_id::QueryId;
use uuid::Uuid;
use super::*;
fn test_query_id(value: u128) -> QueryId {
QueryId::from(Uuid::from_u128(value))
}
#[test]
fn capture_remote_dyn_filters_preserves_parent_filter_ordinals() {
let parent_filters = vec![
Arc::new(Column::new("service", 0)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 1)) as Arc<_>],
lit(true) as _,
)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
Arc::new(Column::new("zone", 2)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("pod", 3)) as Arc<_>],
lit(true) as _,
)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
];
let captured = capture_remote_dyn_filters(parent_filters);
assert_eq!(captured.len(), 2);
assert_eq!(captured[0].producer_local_ordinal, 1);
assert_eq!(captured[1].producer_local_ordinal, 3);
}
#[test]
fn register_remote_dyn_filters_for_region_reuses_existing_entry() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
let captured_remote_dyn_filters = vec![CapturedRemoteDynFilter {
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)),
}];
let region_id = RegionId::new(1024, 7);
register_remote_dyn_filters_for_region(&registry, region_id, &captured_remote_dyn_filters);
register_remote_dyn_filters_for_region(&registry, region_id, &captured_remote_dyn_filters);
assert_eq!(registry.entry_count(), 1);
let entry = registry.entries().pop().unwrap();
assert_eq!(entry.filter_id().producer_ordinal(), 2);
assert_eq!(entry.subscribers().len(), 1);
assert_eq!(entry.subscribers()[0].region_id(), region_id);
}
}