From 583972b32e9d1e555c246be4128631d18458cd32 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 2 Jun 2026 21:26:32 +0800 Subject: [PATCH] feat: add initial dyn filter snapshot Signed-off-by: discord9 --- src/common/query/src/request.rs | 5 +- .../request/initial_remote_dyn_filter_reg.rs | 144 ++++- src/query/src/dist_plan/dyn_filter_bridge.rs | 551 ++++++++++++++---- src/query/src/dist_plan/merge_scan.rs | 87 ++- 4 files changed, 654 insertions(+), 133 deletions(-) diff --git a/src/common/query/src/request.rs b/src/common/query/src/request.rs index 186a551052..4e17c2f6ab 100644 --- a/src/common/query/src/request.rs +++ b/src/common/query/src/request.rs @@ -34,8 +34,9 @@ use store_api::storage::RegionId; /// Current wire-format version for remote dynamic filter payload updates. pub use self::initial_remote_dyn_filter_reg::{ - INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg, - InitialDynFilterRegs, + INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, + INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, InitialDynFilterReg, + InitialDynFilterRegs, InitialDynFilterSnapshot, }; pub const DYN_FILTER_PROTOCOL_VERSION: u32 = 1; diff --git a/src/common/query/src/request/initial_remote_dyn_filter_reg.rs b/src/common/query/src/request/initial_remote_dyn_filter_reg.rs index c9fd920d0d..e2d3d22b05 100644 --- a/src/common/query/src/request/initial_remote_dyn_filter_reg.rs +++ b/src/common/query/src/request/initial_remote_dyn_filter_reg.rs @@ -21,11 +21,18 @@ use datafusion::physical_plan::PhysicalExpr; use datafusion_common::Result as DataFusionResult; use serde::{Deserialize, Serialize}; -use crate::request::{decode_physical_expr_from_bytes, encode_physical_expr_to_bytes}; +use crate::request::{ + DynFilterPayload, decode_physical_expr_from_bytes, encode_physical_expr_to_bytes, +}; pub const INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY: &str = "initial_remote_dyn_filter_registrations"; pub const INITIAL_REMOTE_DYN_FILTER_REGS_MAX_COUNT: usize = 64; +/// Raw encoded registration byte budget for initial remote dynamic filter registrations. +/// +/// This counts DataFusion physical expression proto bytes and optional initial snapshot payload +/// bytes before serde JSON/base64 expansion. It is a payload budget, not an exact final +/// QueryContext extension string-size limit. pub const INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES: usize = 64 * 1024; #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] @@ -43,10 +50,10 @@ impl InitialDynFilterRegs { self.regs.is_empty() } - pub fn total_encoded_child_expr_bytes(&self) -> usize { + pub fn total_encoded_registration_bytes(&self) -> usize { self.regs .iter() - .map(InitialDynFilterReg::encoded_child_expr_bytes) + .map(InitialDynFilterReg::encoded_registration_bytes) .sum() } @@ -70,11 +77,11 @@ impl InitialDynFilterRegs { )); } - let total_proto_bytes = self.total_encoded_child_expr_bytes(); - if total_proto_bytes > max_total_proto_bytes { + let total_registration_bytes = self.total_encoded_registration_bytes(); + if total_registration_bytes > max_total_proto_bytes { return Err(format!( - "InitialDynFilterRegs contains {} total child expr proto bytes, which exceeds the configured limit of {}", - total_proto_bytes, max_total_proto_bytes + "InitialDynFilterRegs contains {} total encoded registration bytes, which exceeds the configured limit of {}", + total_registration_bytes, max_total_proto_bytes )); } @@ -104,6 +111,26 @@ impl InitialDynFilterRegs { pub struct InitialDynFilterReg { pub filter_id: String, pub child_exprs_datafusion_proto: Vec>, + /// Optional producer-side predicate snapshot captured at initial registration time. + /// + /// It is skipped when absent so older/no-snapshot registrations remain wire-compatible. + /// Datanode runtime code should treat this as an update that arrived before the runtime + /// `DynamicFilterPhysicalExpr` was created, not as registration identity metadata. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub initial_snapshot: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct InitialDynFilterSnapshot { + pub payload: DynFilterPayload, + /// Producer-side snapshot generation. Receivers use this with normal update generation + /// ordering to ignore stale payloads and apply newer payloads. + pub generation: u64, + /// Whether this snapshot completes the dynamic filter stream. + /// + /// 3a currently sets this conservatively to false because DataFusion exposes synchronous + /// generation/current reads but not a synchronous public completion getter. + pub is_complete: bool, } impl InitialDynFilterReg { @@ -111,9 +138,15 @@ impl InitialDynFilterReg { Self { filter_id: filter_id.into(), child_exprs_datafusion_proto, + initial_snapshot: None, } } + pub fn with_initial_snapshot(mut self, initial_snapshot: InitialDynFilterSnapshot) -> Self { + self.initial_snapshot = Some(initial_snapshot); + self + } + pub fn from_filter_id_and_children( filter_id: impl Into, children: &[Arc], @@ -130,6 +163,15 @@ impl InitialDynFilterReg { self.child_exprs_datafusion_proto.iter().map(Vec::len).sum() } + pub fn encoded_registration_bytes(&self) -> usize { + self.encoded_child_expr_bytes() + + self + .initial_snapshot + .as_ref() + .map(InitialDynFilterSnapshot::encoded_payload_bytes) + .unwrap_or(0) + } + pub fn decode_children( &self, task_ctx: &TaskContext, @@ -150,6 +192,22 @@ impl InitialDynFilterReg { } } +impl InitialDynFilterSnapshot { + pub fn new(payload: DynFilterPayload, generation: u64, is_complete: bool) -> Self { + Self { + payload, + generation, + is_complete, + } + } + + pub fn encoded_payload_bytes(&self) -> usize { + match &self.payload { + DynFilterPayload::Datafusion(bytes) => bytes.len(), + } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -174,6 +232,59 @@ mod tests { assert_eq!(decoded, regs); } + #[test] + fn initial_dyn_filter_regs_json_round_trip_with_snapshot() { + let regs = InitialDynFilterRegs::new(vec![ + InitialDynFilterReg::new("filter-a", vec![vec![1, 2, 3]]).with_initial_snapshot( + InitialDynFilterSnapshot::new(DynFilterPayload::Datafusion(vec![4, 5, 6]), 7, true), + ), + ]); + + let encoded = regs.to_extension_value().unwrap(); + let decoded = InitialDynFilterRegs::from_extension_value(&encoded).unwrap(); + + assert_eq!(decoded, regs); + assert_eq!( + decoded.regs[0] + .initial_snapshot + .as_ref() + .unwrap() + .generation, + 7 + ); + assert!( + decoded.regs[0] + .initial_snapshot + .as_ref() + .unwrap() + .is_complete + ); + } + + #[test] + fn initial_dyn_filter_reg_json_defaults_missing_snapshot_to_none() { + let decoded = InitialDynFilterRegs::from_extension_value( + r#"{"registrations":[{"filter_id":"filter-a","child_exprs_datafusion_proto":[[1,2,3]]}]}"#, + ) + .unwrap(); + + assert_eq!(decoded.regs.len(), 1); + assert!(decoded.regs[0].initial_snapshot.is_none()); + } + + #[test] + fn initial_dyn_filter_reg_encoded_registration_bytes_include_snapshot_payload() { + let reg = InitialDynFilterReg::new("filter-a", vec![vec![1, 2, 3], vec![4]]) + .with_initial_snapshot(InitialDynFilterSnapshot::new( + DynFilterPayload::Datafusion(vec![5, 6]), + 2, + false, + )); + + assert_eq!(reg.encoded_child_expr_bytes(), 4); + assert_eq!(reg.encoded_registration_bytes(), 6); + } + #[test] fn initial_dyn_filter_regs_validate_bounds_rejects_duplicate_filter_ids() { let regs = InitialDynFilterRegs::new(vec![ @@ -207,7 +318,24 @@ mod tests { let err = regs.validate_bounds(8, 5).unwrap_err(); - assert!(err.contains("6 total child expr proto bytes")); + assert!(err.contains("6 total encoded registration bytes")); + } + + #[test] + fn initial_dyn_filter_regs_validate_bounds_rejects_snapshot_bytes_over_limit() { + let regs = InitialDynFilterRegs::new(vec![ + InitialDynFilterReg::new("filter-a", vec![vec![1]]).with_initial_snapshot( + InitialDynFilterSnapshot::new( + DynFilterPayload::Datafusion(vec![2, 3, 4]), + 2, + false, + ), + ), + ]); + + let err = regs.validate_bounds(8, 3).unwrap_err(); + + assert!(err.contains("4 total encoded registration bytes")); } #[test] diff --git a/src/query/src/dist_plan/dyn_filter_bridge.rs b/src/query/src/dist_plan/dyn_filter_bridge.rs index 41a95b22f1..9b6d74dd51 100644 --- a/src/query/src/dist_plan/dyn_filter_bridge.rs +++ b/src/query/src/dist_plan/dyn_filter_bridge.rs @@ -16,8 +16,9 @@ use std::any::Any; use std::sync::Arc; use common_query::request::{ - INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg, - InitialDynFilterRegs, + DynFilterPayload, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, + INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, InitialDynFilterReg, + InitialDynFilterRegs, InitialDynFilterSnapshot, }; use datafusion::execution::TaskContext; use datafusion_common::Result; @@ -32,33 +33,69 @@ use crate::query_engine::QueryEngineState; #[derive(Debug, Clone)] pub(crate) struct CapturedDynFilter { - pub(crate) producer_scope_id: ProducerScopeId, - pub(crate) producer_local_ordinal: usize, + filter_id: FilterId, + initial_registration: InitialDynFilterReg, pub(crate) alive_dyn_filter: Arc, } #[derive(Debug, Clone)] -struct ResolvedDynFilter { - filter_id: FilterId, - alive_dyn_filter: Arc, - children: Vec>, +pub(crate) struct RemoteDynFilterPushdown { + pub(crate) captured_dyn_filters: Vec, + /// Per-parent-filter preflight readiness. `true` means the initial registration can be + /// constructed and serialized; the caller may still choose to return `PushedDown::No` until + /// datanode-side consumption is available. + pub(crate) pushed_down: Vec, } -pub(crate) fn capture_remote_dyn_filters( +pub(crate) fn capture_remote_dyn_filters_for_pushdown( producer_scope_id: ProducerScopeId, parent_filters: Vec>, -) -> Vec { - parent_filters - .into_iter() - .enumerate() - .filter_map(|(producer_local_ordinal, filter)| { - downcast_dynamic_filter(filter).map(|alive_dyn_filter| CapturedDynFilter { - producer_scope_id, - producer_local_ordinal, - alive_dyn_filter, - }) - }) - .collect() +) -> RemoteDynFilterPushdown { + let mut pushed_down = Vec::with_capacity(parent_filters.len()); + let mut captured_dyn_filters = Vec::new(); + + for (producer_local_ordinal, filter) in parent_filters.into_iter().enumerate() { + let Some(alive_dyn_filter) = downcast_dynamic_filter(filter) else { + pushed_down.push(false); + continue; + }; + + match build_captured_dyn_filter(producer_scope_id, producer_local_ordinal, alive_dyn_filter) + { + Ok(captured_dyn_filter) => { + pushed_down.push(true); + captured_dyn_filters.push(captured_dyn_filter); + } + Err(error) => { + common_telemetry::warn!(error; "Remote dyn filter is not pushed down because initial registration cannot be built"); + pushed_down.push(false); + } + } + } + + // Initial snapshots are an optional warm-start optimization. If validation fails while any + // snapshot is attached, retry once without snapshots before rejecting base registrations. This + // broad fallback is intentional: snapshot bytes are the only optional part here, and the second + // validation below still rejects non-snapshot errors such as duplicate ids or count limits. + if has_initial_snapshots(&captured_dyn_filters) + && let Err(error) = validate_initial_registrations_for_pushdown(&captured_dyn_filters) + { + common_telemetry::warn!(error; "Initial remote dyn filter registrations failed validation with optional snapshots; retrying without snapshots"); + drop_initial_snapshots(&mut captured_dyn_filters); + } + + if let Err(error) = validate_initial_registrations_for_pushdown(&captured_dyn_filters) { + common_telemetry::warn!(error; "Remote dyn filters are not pushed down because initial registrations are invalid"); + return RemoteDynFilterPushdown { + captured_dyn_filters: Vec::new(), + pushed_down: vec![false; pushed_down.len()], + }; + } + + RemoteDynFilterPushdown { + captured_dyn_filters, + pushed_down, + } } fn downcast_dynamic_filter( @@ -78,13 +115,13 @@ pub(crate) fn register_dyn_filters_for_region( region_id: RegionId, captured_dyn_filters: &[CapturedDynFilter], ) { - for resolved_filter in resolved_dyn_filters(captured_dyn_filters) { + for captured_dyn_filter in captured_dyn_filters { let _ = registry.register_remote_dyn_filter( - resolved_filter.filter_id.clone(), - resolved_filter.alive_dyn_filter, + captured_dyn_filter.filter_id.clone(), + captured_dyn_filter.alive_dyn_filter.clone(), ); - let _ = - registry.register_subscriber(&resolved_filter.filter_id, Subscriber::new(region_id)); + let _ = registry + .register_subscriber(&captured_dyn_filter.filter_id, Subscriber::new(region_id)); } } @@ -109,51 +146,117 @@ pub(crate) fn bridge_dyn_filters_for_region( register_dyn_filters_for_region(®istry, region_id, captured_dyn_filters); } -fn resolve_dyn_filter(captured_dyn_filter: &CapturedDynFilter) -> Result { - let children = captured_dyn_filter - .alive_dyn_filter +fn build_captured_dyn_filter( + producer_scope_id: ProducerScopeId, + producer_local_ordinal: usize, + alive_dyn_filter: Arc, +) -> Result { + let children = alive_dyn_filter .children() .into_iter() .cloned() .collect::>(); - let filter_id = build_remote_dyn_filter_id( - captured_dyn_filter.producer_scope_id, - captured_dyn_filter.producer_local_ordinal, - &children, - )?; + let filter_id = + build_remote_dyn_filter_id(producer_scope_id, producer_local_ordinal, &children)?; + let initial_registration = + InitialDynFilterReg::from_filter_id_and_children(filter_id.to_string(), &children)?; - Ok(ResolvedDynFilter { + Ok(CapturedDynFilter { filter_id, - alive_dyn_filter: captured_dyn_filter.alive_dyn_filter.clone(), - children, + initial_registration: attach_stable_initial_snapshot( + initial_registration, + &alive_dyn_filter, + ), + alive_dyn_filter, }) } -fn resolved_dyn_filters(captured_dyn_filters: &[CapturedDynFilter]) -> Vec { +fn validate_initial_registrations_for_pushdown( + captured_dyn_filters: &[CapturedDynFilter], +) -> std::result::Result<(), String> { + let regs = build_initial_dyn_filter_regs_for_region(captured_dyn_filters); + regs.validate_default_bounds()?; + regs.to_extension_value() + .map_err(|error| error.to_string())?; + Ok(()) +} + +fn drop_initial_snapshots(captured_dyn_filters: &mut [CapturedDynFilter]) { + for captured in captured_dyn_filters { + captured.initial_registration.initial_snapshot = None; + } +} + +fn has_initial_snapshots(captured_dyn_filters: &[CapturedDynFilter]) -> bool { captured_dyn_filters .iter() - .filter_map(|captured_dyn_filter| resolve_dyn_filter(captured_dyn_filter).ok()) - .collect() + .any(|captured| captured.initial_registration.initial_snapshot.is_some()) +} + +fn attach_stable_initial_snapshot( + initial_registration: InitialDynFilterReg, + alive_dyn_filter: &DynamicFilterPhysicalExpr, +) -> InitialDynFilterReg { + let Some(initial_snapshot) = stable_initial_snapshot(alive_dyn_filter) else { + return initial_registration; + }; + + initial_registration.with_initial_snapshot(initial_snapshot) +} + +fn stable_initial_snapshot( + alive_dyn_filter: &DynamicFilterPhysicalExpr, +) -> Option { + // DataFusion dynamic filters start at generation 1 with a neutral predicate such as `true`. + // Sending that initial value does not improve remote filtering, so only send snapshots after + // a real producer update has advanced the generation. + let generation_before = alive_dyn_filter.snapshot_generation(); + if generation_before <= 1 { + return None; + } + + // Read a stable snapshot by checking the generation before and after `current()`. This relies + // on DataFusion's generation being monotonic for each update; if it changes while reading, + // omit the optional snapshot and let later async updates carry the new predicate. + let current = match alive_dyn_filter.current() { + Ok(current) => current, + Err(error) => { + common_telemetry::warn!(error; "Failed to read remote dyn filter initial snapshot"); + return None; + } + }; + let generation_after = alive_dyn_filter.snapshot_generation(); + if generation_before != generation_after { + return None; + } + + let payload = match DynFilterPayload::from_datafusion_expr( + ¤t, + INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, + ) { + Ok(payload) => payload, + Err(error) => { + common_telemetry::warn!(error; "Failed to encode remote dyn filter initial snapshot"); + return None; + } + }; + + Some(InitialDynFilterSnapshot::new( + payload, + generation_after, + // DataFusion does not expose a synchronous public completion getter. Treat the initial + // snapshot as a non-terminal pending update; later runtime/update handling owns completion. + false, + )) } fn build_initial_dyn_filter_regs_for_region( captured_dyn_filters: &[CapturedDynFilter], ) -> InitialDynFilterRegs { InitialDynFilterRegs::new( - resolved_dyn_filters(captured_dyn_filters) - .into_iter() - .filter_map(|resolved_filter| { - match InitialDynFilterReg::from_filter_id_and_children( - resolved_filter.filter_id.to_string(), - &resolved_filter.children, - ) { - Ok(reg) => Some(reg), - Err(error) => { - common_telemetry::warn!(error; "Failed to encode initial remote dyn filter registration"); - None - } - } - }) + captured_dyn_filters + .iter() + .map(|captured| captured.initial_registration.clone()) .collect(), ) } @@ -189,12 +292,82 @@ pub(crate) fn query_context_with_initial_dyn_filter_regs( #[cfg(test)] mod tests { + use std::fmt; + use std::hash::{Hash, Hasher}; + + use datafusion_common::ScalarValue; + use datafusion_expr::ColumnarValue; use datafusion_physical_expr::expressions::{Column, lit}; use session::query_id::QueryId; use uuid::Uuid; use super::*; + #[derive(Debug)] + struct UnserializableExpr; + + impl fmt::Display for UnserializableExpr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "unserializable_expr") + } + } + + impl Hash for UnserializableExpr { + fn hash(&self, state: &mut H) { + "unserializable_expr".hash(state); + } + } + + impl PartialEq for UnserializableExpr { + fn eq(&self, _other: &Self) -> bool { + true + } + } + + impl Eq for UnserializableExpr {} + + impl datafusion_physical_expr::PhysicalExpr for UnserializableExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type( + &self, + _input_schema: &arrow_schema::Schema, + ) -> datafusion_common::Result { + Ok(arrow_schema::DataType::Boolean) + } + + fn nullable( + &self, + _input_schema: &arrow_schema::Schema, + ) -> datafusion_common::Result { + Ok(false) + } + + fn evaluate( + &self, + _batch: &common_recordbatch::DfRecordBatch, + ) -> datafusion_common::Result { + Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)))) + } + + fn children(&self) -> Vec<&Arc> { + Vec::new() + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion_common::Result> { + Ok(self) + } + + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{self}") + } + } + fn test_query_id(value: u128) -> QueryId { QueryId::from(Uuid::from_u128(value)) } @@ -203,8 +376,40 @@ mod tests { ProducerScopeId::new(value) } + fn test_captured_dyn_filter( + producer_scope_id: ProducerScopeId, + producer_local_ordinal: usize, + column_name: &str, + column_index: usize, + ) -> CapturedDynFilter { + build_captured_dyn_filter( + producer_scope_id, + producer_local_ordinal, + Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>], + lit(true) as _, + )), + ) + .unwrap() + } + + fn test_dyn_filter_with_snapshot_payload( + column_name: &str, + column_index: usize, + payload_bytes: usize, + ) -> Arc { + let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>], + lit(true) as _, + )); + dyn_filter + .update(lit(ScalarValue::Utf8(Some("x".repeat(payload_bytes)))) as _) + .unwrap(); + dyn_filter + } + #[test] - fn capture_remote_dyn_filters_preserves_parent_filter_ordinals() { + fn capture_remote_dyn_filters_for_pushdown_preserves_parent_filter_ordinals() { let parent_filters = vec![ Arc::new(Column::new("service", 0)) as Arc, Arc::new(DynamicFilterPhysicalExpr::new( @@ -219,26 +424,182 @@ mod tests { ]; let producer_scope_id = test_producer_scope(42); - let captured = capture_remote_dyn_filters(producer_scope_id, parent_filters); + let captured = capture_remote_dyn_filters_for_pushdown(producer_scope_id, parent_filters) + .captured_dyn_filters; assert_eq!(captured.len(), 2); - assert_eq!(captured[0].producer_scope_id, producer_scope_id); - assert_eq!(captured[1].producer_scope_id, producer_scope_id); - assert_eq!(captured[0].producer_local_ordinal, 1); - assert_eq!(captured[1].producer_local_ordinal, 3); + assert_eq!(captured[0].filter_id.producer_scope_id(), producer_scope_id); + assert_eq!(captured[1].filter_id.producer_scope_id(), producer_scope_id); + assert_eq!(captured[0].filter_id.producer_ordinal(), 1); + assert_eq!(captured[1].filter_id.producer_ordinal(), 3); + } + + #[test] + fn capture_remote_dyn_filters_for_pushdown_marks_only_valid_initial_regs() { + let parent_filters = vec![ + Arc::new(Column::new("service", 0)) as Arc, + Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("host", 1)) as Arc<_>], + lit(true) as _, + )) as Arc, + Arc::new(Column::new("zone", 2)) as Arc, + ]; + + let producer_scope_id = test_producer_scope(42); + let pushdown = capture_remote_dyn_filters_for_pushdown(producer_scope_id, parent_filters); + + assert_eq!(pushdown.pushed_down, vec![false, true, false]); + assert_eq!(pushdown.captured_dyn_filters.len(), 1); + assert_eq!( + pushdown.captured_dyn_filters[0] + .filter_id + .producer_scope_id(), + producer_scope_id + ); + assert_eq!( + pushdown.captured_dyn_filters[0] + .filter_id + .producer_ordinal(), + 1 + ); + assert!( + pushdown.captured_dyn_filters[0] + .initial_registration + .initial_snapshot + .is_none() + ); + } + + #[test] + fn capture_remote_dyn_filters_for_pushdown_rejects_unencodable_registration() { + let parent_filters = vec![Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(UnserializableExpr) as Arc<_>], + lit(true) as _, + )) + as Arc]; + + let pushdown = + capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + + assert_eq!(pushdown.pushed_down, vec![false]); + assert!(pushdown.captured_dyn_filters.is_empty()); + } + + #[test] + fn capture_remote_dyn_filters_for_pushdown_omits_neutral_initial_snapshot() { + let parent_filters = vec![Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("host", 1)) as Arc<_>], + lit(true) as _, + )) + as Arc]; + + let pushdown = + capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + + assert_eq!(pushdown.pushed_down, vec![true]); + assert!( + pushdown.captured_dyn_filters[0] + .initial_registration + .initial_snapshot + .is_none() + ); + } + + #[test] + fn capture_remote_dyn_filters_for_pushdown_attaches_stable_initial_snapshot() { + let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("host", 1)) as Arc<_>], + lit(true) as _, + )); + dyn_filter.update(lit(false) as _).unwrap(); + let parent_filters = vec![dyn_filter as Arc]; + + let pushdown = + capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + + assert_eq!(pushdown.pushed_down, vec![true]); + let snapshot = pushdown.captured_dyn_filters[0] + .initial_registration + .initial_snapshot + .as_ref() + .unwrap(); + assert_eq!(snapshot.generation, 2); + assert!(!snapshot.is_complete); + assert!(matches!( + snapshot.payload, + DynFilterPayload::Datafusion(ref bytes) if !bytes.is_empty() + )); + } + + #[test] + fn capture_remote_dyn_filters_for_pushdown_drops_oversized_snapshots_and_keeps_regs() { + let parent_filters = vec![ + test_dyn_filter_with_snapshot_payload("host", 0, 40 * 1024) + as Arc, + test_dyn_filter_with_snapshot_payload("pod", 1, 40 * 1024) + as Arc, + ]; + + let pushdown = + capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + + assert_eq!(pushdown.pushed_down, vec![true, true]); + assert_eq!(pushdown.captured_dyn_filters.len(), 2); + assert!( + pushdown + .captured_dyn_filters + .iter() + .all(|captured| { captured.initial_registration.initial_snapshot.is_none() }) + ); + } + + #[test] + fn capture_remote_dyn_filters_for_pushdown_rejects_regs_still_invalid_after_snapshot_drop() { + const TOO_MANY_INITIAL_REGS: usize = 65; + + let parent_filters = (0..TOO_MANY_INITIAL_REGS) + .map(|ordinal| { + test_dyn_filter_with_snapshot_payload(&format!("host_{ordinal}"), ordinal, 1) + as Arc + }) + .collect::>(); + + let pushdown = + capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + + assert!(pushdown.captured_dyn_filters.is_empty()); + assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]); + } + + #[test] + fn capture_remote_dyn_filters_for_pushdown_rejects_regs_exceeding_bounds() { + const TOO_MANY_INITIAL_REGS: usize = 65; + + let parent_filters = (0..TOO_MANY_INITIAL_REGS) + .map(|_| { + Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("host", 0)) as Arc<_>], + lit(true) as _, + )) as Arc + }) + .collect::>(); + + let pushdown = + capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + + assert!(pushdown.captured_dyn_filters.is_empty()); + assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]); } #[test] fn register_dyn_filters_for_region_reuses_existing_entry() { let registry = QueryDynFilterRegistry::new(test_query_id(1)); - let captured_dyn_filters = vec![CapturedDynFilter { - producer_scope_id: test_producer_scope(42), - producer_local_ordinal: 2, - alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new( - vec![Arc::new(Column::new("host", 0)) as Arc<_>], - lit(true) as _, - )), - }]; + let captured_dyn_filters = vec![test_captured_dyn_filter( + test_producer_scope(42), + 2, + "host", + 0, + )]; let first_region_id = RegionId::new(1024, 7); let second_region_id = RegionId::new(1024, 8); @@ -270,14 +631,8 @@ mod tests { fn register_dyn_filters_for_region_keeps_independent_producer_scopes_distinct() { let registry = QueryDynFilterRegistry::new(test_query_id(1)); let region_id = RegionId::new(1024, 7); - let make_filter = |producer_scope_id| CapturedDynFilter { - producer_scope_id, - producer_local_ordinal: 2, - alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new( - vec![Arc::new(Column::new("host", 0)) as Arc<_>], - lit(true) as _, - )), - }; + let make_filter = + |producer_scope_id| test_captured_dyn_filter(producer_scope_id, 2, "host", 0); register_dyn_filters_for_region( ®istry, @@ -295,14 +650,12 @@ mod tests { #[test] fn query_context_includes_region_initial_dyn_filter_regs() { - let captured_dyn_filters = vec![CapturedDynFilter { - producer_scope_id: test_producer_scope(42), - producer_local_ordinal: 2, - alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new( - vec![Arc::new(Column::new("host", 0)) as Arc<_>], - lit(true) as _, - )), - }]; + let captured_dyn_filters = vec![test_captured_dyn_filter( + test_producer_scope(42), + 2, + "host", + 0, + )]; let region_id = RegionId::new(1024, 7); let query_ctx = QueryContext::arc(); @@ -326,15 +679,11 @@ mod tests { 1024, ) .unwrap(); - let expected_filter_id = build_remote_dyn_filter_id( - captured_dyn_filters[0].producer_scope_id, - captured_dyn_filters[0].producer_local_ordinal, - &[Arc::new(Column::new("host", 0)) as Arc<_>], - ) - .unwrap(); - assert_eq!(regs.regs.len(), 1); - assert_eq!(regs.regs[0].filter_id, expected_filter_id.to_string()); + assert_eq!( + regs.regs[0].filter_id, + captured_dyn_filters[0].filter_id.to_string() + ); assert_eq!(decoded_children.len(), 1); assert!(decoded_children[0].as_any().is::()); } @@ -342,22 +691,8 @@ mod tests { #[test] fn query_context_drops_initial_regs_when_duplicate_filter_ids_exceed_bounds() { let captured_dyn_filters = vec![ - CapturedDynFilter { - producer_scope_id: test_producer_scope(42), - producer_local_ordinal: 2, - alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new( - vec![Arc::new(Column::new("host", 0)) as Arc<_>], - lit(true) as _, - )), - }, - CapturedDynFilter { - producer_scope_id: test_producer_scope(42), - producer_local_ordinal: 2, - alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new( - vec![Arc::new(Column::new("host", 0)) as Arc<_>], - lit(true) as _, - )), - }, + test_captured_dyn_filter(test_producer_scope(42), 2, "host", 0), + test_captured_dyn_filter(test_producer_scope(42), 2, "host", 0), ]; let region_id = RegionId::new(1024, 7); let query_ctx = QueryContext::arc(); diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 30fd7cba04..c148f7baff 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -39,7 +39,7 @@ use datafusion::physical_plan::{ }; use datafusion_common::{Column as ColumnExpr, DataFusionError, Result}; use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; -use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr}; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr}; use futures_util::StreamExt; use greptime_proto::v1::region::RegionRequestHeader; @@ -54,7 +54,7 @@ use tracing::{Instrument, Span}; use crate::dist_plan::analyzer::AliasMapping; use crate::dist_plan::analyzer::utils::patch_batch_timezone; use crate::dist_plan::dyn_filter_bridge::{ - CapturedDynFilter, bridge_dyn_filters_for_region, capture_remote_dyn_filters, + CapturedDynFilter, bridge_dyn_filters_for_region, capture_remote_dyn_filters_for_pushdown, query_context_with_initial_dyn_filter_regs, }; use crate::dist_plan::{ProducerScopeId, RemoteDynFilterRegistryLease}; @@ -715,23 +715,25 @@ impl ExecutionPlan for MergeScanExec { .into_iter() .map(|filter| filter.filter) .collect::>(); - let supported = parent_filters - .iter() - .map(|filter| filter.as_any().is::()) - .collect::>(); + let remote_dyn_filter_pushdown = capture_remote_dyn_filters_for_pushdown( + self.remote_dyn_filter_producer_scope_id, + parent_filters, + ); *self.captured_remote_dyn_filters.lock().unwrap() = - capture_remote_dyn_filters(self.remote_dyn_filter_producer_scope_id, parent_filters); + remote_dyn_filter_pushdown.captured_dyn_filters; let new_self = Arc::new(self.clone()); Ok(FilterPushdownPropagation { - filters: supported + filters: remote_dyn_filter_pushdown + .pushed_down .into_iter() - .map(|supported| { - if supported { - PushedDown::Yes - } else { - PushedDown::No - } + .map(|_pushdown_ready| { + // TODO(remote-dyn-filter): Return `PushedDown::Yes` for `_pushdown_ready` + // filters after datanode-side initial registration consumption and runtime + // pending-update application are implemented. Until then, keep the parent-side + // filter as a correctness fallback because the remote side may ignore the + // registration carried in QueryContext. + PushedDown::No }) .collect(), updated_node: Some(new_self), @@ -851,11 +853,15 @@ mod tests { use std::collections::BTreeSet; use async_trait::async_trait; + use datafusion::config::ConfigOptions; use datafusion::execution::SessionStateBuilder; + use datafusion::physical_plan::filter_pushdown::ChildFilterPushdownResult; use datafusion_common::TableReference; use datafusion_expr::{LogicalPlanBuilder, lit}; use datafusion_physical_expr::Distribution; - use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr::expressions::{ + Column, DynamicFilterPhysicalExpr, lit as physical_lit, + }; use session::ReadPreference; use session::context::QueryContext; use session::query_id::QueryId; @@ -997,4 +1003,55 @@ mod tests { "try_with_new_distribution must preserve producer scope id" ); } + + #[test] + fn remote_dyn_filter_preflight_keeps_parent_filter_until_dn_runtime_is_ready() { + let producer_scope_id = ProducerScopeId::new(42); + let plan = LogicalPlanBuilder::empty(true) + .project(vec![lit(1i32).alias("col1")]) + .unwrap() + .build() + .unwrap(); + + let schema = plan.schema().as_arrow().clone(); + let table = TableName::new("catalog", "schema", "table"); + let regions = vec![RegionId::new(1024, 1)]; + let query_ctx = QueryContext::arc(); + let session_state = SessionStateBuilder::new().build(); + let handler = Arc::new(TestRegionQueryHandler); + let exec = MergeScanExec::new( + &session_state, + table, + regions, + plan, + &schema, + handler, + query_ctx, + 1, + AliasMapping::new(), + producer_scope_id, + ) + .unwrap(); + let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("host", 0)) as Arc<_>], + physical_lit(true) as _, + )) as Arc; + + let propagation = exec + .handle_child_pushdown_result( + FilterPushdownPhase::Post, + ChildPushdownResult { + parent_filters: vec![ChildFilterPushdownResult { + filter: dyn_filter, + child_results: vec![PushedDown::Yes], + }], + self_filters: Vec::new(), + }, + &ConfigOptions::new(), + ) + .unwrap(); + + assert_eq!(exec.captured_remote_dyn_filters().len(), 1); + assert!(matches!(propagation.filters.as_slice(), [PushedDown::No])); + } }