From bbf289d89ff93e698ddd9114fa1013e3ee0eef25 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 3 Jun 2026 21:12:28 +0800 Subject: [PATCH] refactor: per review Signed-off-by: discord9 --- src/common/query/src/request.rs | 27 +- src/common/query/src/request/base64_serde.rs | 78 +++++ .../request/initial_remote_dyn_filter_reg.rs | 21 +- src/query/src/dist_plan.rs | 2 +- src/query/src/dist_plan/analyzer.rs | 145 ++++---- src/query/src/dist_plan/analyzer/fallback.rs | 23 +- src/query/src/dist_plan/analyzer/test.rs | 109 +++--- src/query/src/dist_plan/dyn_filter_bridge.rs | 141 ++++---- src/query/src/dist_plan/filter_id.rs | 108 +++--- src/query/src/dist_plan/merge_scan.rs | 109 +++--- src/query/src/dist_plan/planner.rs | 2 +- .../dist_plan/remote_dyn_filter_registry.rs | 327 +++++++++++++++--- src/query/src/metrics.rs | 4 +- src/query/src/optimizer/pass_distribution.rs | 4 +- src/query/src/query_engine/state.rs | 41 ++- 15 files changed, 733 insertions(+), 408 deletions(-) create mode 100644 src/common/query/src/request/base64_serde.rs diff --git a/src/common/query/src/request.rs b/src/common/query/src/request.rs index 4e17c2f6ab..121292709f 100644 --- a/src/common/query/src/request.rs +++ b/src/common/query/src/request.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod base64_serde; mod initial_remote_dyn_filter_reg; use std::sync::Arc; @@ -56,31 +57,7 @@ pub const DYN_FILTER_PROTOCOL_VERSION: u32 = 1; pub enum DynFilterPayload { /// A serialized DataFusion [`PhysicalExpr`] encoded as a protobuf /// [`PhysicalExprNode`]. - Datafusion(#[serde(with = "base64_bytes")] Vec), -} - -mod base64_bytes { - use base64::Engine; - use base64::prelude::BASE64_STANDARD; - use serde::de::Error; - use serde::{Deserialize, Deserializer, Serializer}; - - pub fn serialize(bytes: &[u8], serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&BASE64_STANDARD.encode(bytes)) - } - - pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> - where - D: Deserializer<'de>, - { - let encoded = String::deserialize(deserializer)?; - BASE64_STANDARD.decode(encoded).map_err(|err| { - D::Error::custom(format!("invalid base64 dynamic filter payload: {err}")) - }) - } + Datafusion(#[serde(with = "base64_serde::bytes")] Vec), } impl DynFilterPayload { diff --git a/src/common/query/src/request/base64_serde.rs b/src/common/query/src/request/base64_serde.rs new file mode 100644 index 0000000000..9de0db2f1e --- /dev/null +++ b/src/common/query/src/request/base64_serde.rs @@ -0,0 +1,78 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Serde adapters for byte fields encoded as base64 strings in JSON. + +use base64::Engine; +use base64::prelude::BASE64_STANDARD; + +fn encode(bytes: &[u8]) -> String { + BASE64_STANDARD.encode(bytes) +} + +fn decode(encoded: &str) -> Result, base64::DecodeError> { + BASE64_STANDARD.decode(encoded) +} + +pub(crate) mod bytes { + use serde::de::Error; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(value: &[u8], serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&super::encode(value)) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let encoded = String::deserialize(deserializer)?; + super::decode(&encoded).map_err(|err| { + D::Error::custom(format!("invalid base64 dynamic filter payload: {err}")) + }) + } +} + +pub(crate) mod bytes_vec { + use serde::de::Error; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize(values: &[Vec], serializer: S) -> Result + where + S: Serializer, + { + values + .iter() + .map(|bytes| super::encode(bytes)) + .collect::>() + .serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result>, D::Error> + where + D: Deserializer<'de>, + { + Vec::::deserialize(deserializer)? + .into_iter() + .map(|encoded| { + super::decode(&encoded).map_err(|error| { + D::Error::custom(format!("invalid base64 bytes vector item: {error}")) + }) + }) + .collect() + } +} diff --git a/src/common/query/src/request/initial_remote_dyn_filter_reg.rs b/src/common/query/src/request/initial_remote_dyn_filter_reg.rs index e2d3d22b05..01f193384b 100644 --- a/src/common/query/src/request/initial_remote_dyn_filter_reg.rs +++ b/src/common/query/src/request/initial_remote_dyn_filter_reg.rs @@ -110,6 +110,7 @@ impl InitialDynFilterRegs { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct InitialDynFilterReg { pub filter_id: String, + #[serde(with = "super::base64_serde::bytes_vec")] pub child_exprs_datafusion_proto: Vec>, /// Optional producer-side predicate snapshot captured at initial registration time. /// @@ -227,8 +228,17 @@ mod tests { ]); let encoded = regs.to_extension_value().unwrap(); + let json: serde_json::Value = serde_json::from_str(&encoded).unwrap(); let decoded = InitialDynFilterRegs::from_extension_value(&encoded).unwrap(); + assert_eq!( + json["registrations"][0]["child_exprs_datafusion_proto"], + serde_json::json!(["AQID"]) + ); + assert_eq!( + json["registrations"][1]["child_exprs_datafusion_proto"], + serde_json::json!(["BAU="]) + ); assert_eq!(decoded, regs); } @@ -241,8 +251,17 @@ mod tests { ]); let encoded = regs.to_extension_value().unwrap(); + let json: serde_json::Value = serde_json::from_str(&encoded).unwrap(); let decoded = InitialDynFilterRegs::from_extension_value(&encoded).unwrap(); + assert_eq!( + json["registrations"][0]["child_exprs_datafusion_proto"], + serde_json::json!(["AQID"]) + ); + assert_eq!( + json["registrations"][0]["initial_snapshot"]["payload"], + serde_json::json!({"kind":"datafusion","payload":"BAUG"}) + ); assert_eq!(decoded, regs); assert_eq!( decoded.regs[0] @@ -264,7 +283,7 @@ mod tests { #[test] fn initial_dyn_filter_reg_json_defaults_missing_snapshot_to_none() { let decoded = InitialDynFilterRegs::from_extension_value( - r#"{"registrations":[{"filter_id":"filter-a","child_exprs_datafusion_proto":[[1,2,3]]}]}"#, + r#"{"registrations":[{"filter_id":"filter-a","child_exprs_datafusion_proto":["AQID"]}]}"#, ) .unwrap(); diff --git a/src/query/src/dist_plan.rs b/src/query/src/dist_plan.rs index f1f5196fcf..084ca7d1e5 100644 --- a/src/query/src/dist_plan.rs +++ b/src/query/src/dist_plan.rs @@ -24,7 +24,7 @@ mod region_pruner; mod remote_dyn_filter_registry; pub use analyzer::{DistPlannerAnalyzer, DistPlannerOptions}; -pub use filter_id::{FilterFingerprint, FilterId, ParseFilterIdError, ProducerScopeId}; +pub use filter_id::{FilterFingerprint, FilterId, ParseFilterIdError, RemoteDynFilterProducerId}; pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan}; pub use planner::{DistExtensionPlanner, MergeSortExtensionPlanner}; pub use predicate_extractor::PredicateExtractor; diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index f757e41eea..ba0d5a70c0 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -14,7 +14,6 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; use common_telemetry::debug; use datafusion::config::{ConfigExtension, ExtensionOptions}; @@ -32,7 +31,7 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; -use crate::dist_plan::ProducerScopeId; +use crate::dist_plan::RemoteDynFilterProducerId; use crate::dist_plan::analyzer::utils::{ PatchOptimizerContext, PlanTreeExpressionSimplifier, aliased_columns_for, rewrite_merge_sort_exprs, @@ -124,15 +123,14 @@ impl AnalyzerRule for DistPlannerAnalyzer { let opt = config.extensions.get::(); let allow_fallback = opt.map(|o| o.allow_query_fallback).unwrap_or(false); - let producer_scope_allocator = ProducerScopeAllocator::default(); - let result = match self.try_push_down(plan.clone(), producer_scope_allocator.clone()) { + let result = match self.try_push_down(plan.clone()) { Ok(plan) => plan, Err(err) => { if allow_fallback { common_telemetry::warn!(err; "Failed to push down plan, using fallback plan rewriter for plan: {plan}"); // if push down failed, use fallback plan rewriter PUSH_DOWN_FALLBACK_ERRORS_TOTAL.inc(); - self.use_fallback(plan, producer_scope_allocator)? + self.use_fallback(plan)? } else { return Err(err); } @@ -145,37 +143,21 @@ impl AnalyzerRule for DistPlannerAnalyzer { impl DistPlannerAnalyzer { /// Try push down as many nodes as possible - fn try_push_down( - &self, - plan: LogicalPlan, - producer_scope_allocator: ProducerScopeAllocator, - ) -> DfResult { - let plan = plan.transform(|plan| { - Self::inspect_plan_with_subquery(plan, producer_scope_allocator.clone()) - })?; - let mut rewriter = PlanRewriter::new(producer_scope_allocator); + fn try_push_down(&self, plan: LogicalPlan) -> DfResult { + let plan = plan.transform(&Self::inspect_plan_with_subquery)?; + let mut rewriter = PlanRewriter::default(); let result = plan.data.rewrite(&mut rewriter)?.data; - Ok(result) + Self::assign_merge_scan_remote_dyn_filter_producer_ids(result) } /// Use fallback plan rewriter to rewrite the plan and only push down table scan nodes - fn use_fallback( - &self, - plan: LogicalPlan, - producer_scope_allocator: ProducerScopeAllocator, - ) -> DfResult { - let plan = plan.transform(|plan| { - Self::inspect_plan_with_subquery(plan, producer_scope_allocator.clone()) - })?; - let mut rewriter = fallback::FallbackPlanRewriter::new(producer_scope_allocator); - let result = plan.data.rewrite(&mut rewriter)?.data; - Ok(result) + fn use_fallback(&self, plan: LogicalPlan) -> DfResult { + let mut rewriter = fallback::FallbackPlanRewriter; + let result = plan.rewrite(&mut rewriter)?.data; + Self::assign_merge_scan_remote_dyn_filter_producer_ids(result) } - fn inspect_plan_with_subquery( - plan: LogicalPlan, - producer_scope_allocator: ProducerScopeAllocator, - ) -> DfResult> { + fn inspect_plan_with_subquery(plan: LogicalPlan) -> DfResult> { // Workaround for https://github.com/GreptimeTeam/greptimedb/issues/5469 and https://github.com/GreptimeTeam/greptimedb/issues/5799 // FIXME(yingwen): Remove the `Limit` plan once we update DataFusion. if let LogicalPlan::Limit(_) | LogicalPlan::Distinct(_) = &plan { @@ -185,10 +167,7 @@ impl DistPlannerAnalyzer { let exprs = plan .expressions_consider_join() .into_iter() - .map(|e| { - e.transform(|expr| Self::transform_subquery(expr, producer_scope_allocator.clone())) - .map(|x| x.data) - }) + .map(|e| e.transform(&Self::transform_subquery).map(|x| x.data)) .collect::>>()?; // Some plans that are special treated (should not call `with_new_exprs` on them) @@ -200,37 +179,33 @@ impl DistPlannerAnalyzer { } } - fn transform_subquery( - expr: Expr, - producer_scope_allocator: ProducerScopeAllocator, - ) -> DfResult> { + fn transform_subquery(expr: Expr) -> DfResult> { match expr { Expr::Exists(exists) => Ok(Transformed::yes(Expr::Exists(Exists { - subquery: Self::handle_subquery(exists.subquery, producer_scope_allocator)?, + subquery: Self::handle_subquery(exists.subquery)?, negated: exists.negated, }))), Expr::InSubquery(in_subquery) => Ok(Transformed::yes(Expr::InSubquery(InSubquery { expr: in_subquery.expr, - subquery: Self::handle_subquery(in_subquery.subquery, producer_scope_allocator)?, + subquery: Self::handle_subquery(in_subquery.subquery)?, negated: in_subquery.negated, }))), Expr::ScalarSubquery(scalar_subquery) => Ok(Transformed::yes(Expr::ScalarSubquery( - Self::handle_subquery(scalar_subquery, producer_scope_allocator)?, + Self::handle_subquery(scalar_subquery)?, ))), _ => Ok(Transformed::no(expr)), } } - fn handle_subquery( - subquery: Subquery, - producer_scope_allocator: ProducerScopeAllocator, - ) -> DfResult { - let subquery_plan = subquery.subquery.as_ref().clone().transform(|plan| { - Self::inspect_plan_with_subquery(plan, producer_scope_allocator.clone()) - })?; - let mut rewriter = PlanRewriter::new(producer_scope_allocator); - let mut rewrote_subquery = subquery_plan.data.rewrite(&mut rewriter)?.data; + fn handle_subquery(subquery: Subquery) -> DfResult { + let mut rewriter = PlanRewriter::default(); + let mut rewrote_subquery = subquery + .subquery + .as_ref() + .clone() + .rewrite(&mut rewriter)? + .data; // Workaround. DF doesn't support the first plan in subquery to be an Extension if matches!(rewrote_subquery, LogicalPlan::Extension(_)) { let output_schema = rewrote_subquery.schema().clone(); @@ -250,20 +225,56 @@ impl DistPlannerAnalyzer { spans: Default::default(), }) } + + fn assign_merge_scan_remote_dyn_filter_producer_ids( + plan: LogicalPlan, + ) -> DfResult { + let mut assigner = MergeScanRemoteDynFilterProducerIdAssigner::default(); + Ok(plan.rewrite_with_subqueries(&mut assigner)?.data) + } } -#[derive(Debug, Clone, Default)] -pub(super) struct ProducerScopeAllocator { - next_remote_dyn_filter_producer_scope_id: Arc, +#[derive(Debug, Default)] +struct RemoteDynFilterProducerIdAllocator { + next_remote_dyn_filter_producer_id: u64, } -impl ProducerScopeAllocator { - fn allocate(&self) -> ProducerScopeId { - ProducerScopeId::new( - self.next_remote_dyn_filter_producer_scope_id - .fetch_add(1, Ordering::SeqCst) - + 1, - ) +impl RemoteDynFilterProducerIdAllocator { + fn allocate(&mut self) -> RemoteDynFilterProducerId { + self.next_remote_dyn_filter_producer_id += 1; + RemoteDynFilterProducerId::new(self.next_remote_dyn_filter_producer_id) + } +} + +/// Assigns query-local RDF producer ids to visible `MergeScan` nodes after plan rewriting. +#[derive(Debug, Default)] +struct MergeScanRemoteDynFilterProducerIdAssigner { + remote_dyn_filter_producer_id_allocator: RemoteDynFilterProducerIdAllocator, +} + +impl TreeNodeRewriter for MergeScanRemoteDynFilterProducerIdAssigner { + type Node = LogicalPlan; + + fn f_up(&mut self, node: Self::Node) -> DfResult> { + let LogicalPlan::Extension(extension) = &node else { + return Ok(Transformed::no(node)); + }; + let Some(merge_scan) = extension + .node + .as_any() + .downcast_ref::() + else { + return Ok(Transformed::no(node)); + }; + + Ok(Transformed::yes( + merge_scan + .clone() + .with_remote_dyn_filter_producer_id( + self.remote_dyn_filter_producer_id_allocator.allocate(), + ) + .into_logical_plan(), + )) } } @@ -330,21 +341,9 @@ struct PlanRewriter { /// so that we can push down the `Sort` plan as much as possible. expand_on_next_part_cond_trans_commutative: bool, new_child_plan: Option, - producer_scope_allocator: ProducerScopeAllocator, } impl PlanRewriter { - fn new(producer_scope_allocator: ProducerScopeAllocator) -> Self { - Self { - producer_scope_allocator, - ..Default::default() - } - } - - fn allocate_remote_dyn_filter_producer_scope_id(&self) -> ProducerScopeId { - self.producer_scope_allocator.allocate() - } - fn get_parent(&self) -> Option<&LogicalPlan> { // level starts from 1, it's safe to minus by 1 self.stack @@ -639,14 +638,12 @@ impl PlanRewriter { ); // add merge scan as the new root - let producer_scope_id = self.allocate_remote_dyn_filter_producer_scope_id(); let mut node = MergeScanLogicalPlan::new( on_node.clone(), false, // at this stage, the partition cols should be set // treat it as non-partitioned if None self.partition_cols.clone().unwrap_or_default(), - producer_scope_id, ) .into_logical_plan(); diff --git a/src/query/src/dist_plan/analyzer/fallback.rs b/src/query/src/dist_plan/analyzer/fallback.rs index 80e25597aa..4cefbbc768 100644 --- a/src/query/src/dist_plan/analyzer/fallback.rs +++ b/src/query/src/dist_plan/analyzer/fallback.rs @@ -27,31 +27,15 @@ use datafusion_expr::LogicalPlan; use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; -use crate::dist_plan::analyzer::{ - AliasMapping, OTHER_PHY_PART_COL_PLACEHOLDER, ProducerScopeAllocator, -}; -use crate::dist_plan::{MergeScanLogicalPlan, ProducerScopeId}; +use crate::dist_plan::MergeScanLogicalPlan; +use crate::dist_plan::analyzer::{AliasMapping, OTHER_PHY_PART_COL_PLACEHOLDER}; /// FallbackPlanRewriter is a plan rewriter that will only push down table scan node /// This is used when `PlanRewriter` produce errors when trying to rewrite the plan /// This is a temporary solution, and will be removed once we have a more robust plan rewriter /// It will traverse the logical plan and rewrite table scan node to merge scan node #[derive(Debug, Clone, Default)] -pub struct FallbackPlanRewriter { - producer_scope_allocator: ProducerScopeAllocator, -} - -impl FallbackPlanRewriter { - pub fn new(producer_scope_allocator: ProducerScopeAllocator) -> Self { - Self { - producer_scope_allocator, - } - } - - fn allocate_remote_dyn_filter_producer_scope_id(&self) -> ProducerScopeId { - self.producer_scope_allocator.allocate() - } -} +pub struct FallbackPlanRewriter; impl TreeNodeRewriter for FallbackPlanRewriter { type Node = LogicalPlan; @@ -121,7 +105,6 @@ impl TreeNodeRewriter for FallbackPlanRewriter { // at this stage, the partition cols should be set // treat it as non-partitioned if None partition_cols.clone().unwrap_or_default(), - self.allocate_remote_dyn_filter_producer_scope_id(), ) .into_logical_plan(); Ok(Transformed::yes(node)) diff --git a/src/query/src/dist_plan/analyzer/test.rs b/src/query/src/dist_plan/analyzer/test.rs index d12132b947..6321337858 100644 --- a/src/query/src/dist_plan/analyzer/test.rs +++ b/src/query/src/dist_plan/analyzer/test.rs @@ -30,7 +30,7 @@ use datafusion::functions_aggregate::expr_fn::avg; use datafusion::functions_aggregate::min_max::{max, min}; use datafusion::prelude::SessionContext; use datafusion_common::{JoinType, ScalarValue}; -use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::expr::{Exists, ScalarFunction}; use datafusion_expr::{ AggregateUDF, Expr, ExprSchemable as _, LogicalPlanBuilder, Operator, Subquery, binary_expr, col, lit, @@ -55,40 +55,48 @@ use table::{Table, TableRef}; use super::*; -fn collect_merge_scan_producer_scope_ids( +fn collect_merge_scan_remote_dyn_filter_producer_ids( plan: &LogicalPlan, - scopes: &mut BTreeSet, + producer_ids: &mut BTreeSet, ) { - if let LogicalPlan::Extension(extension) = plan - && let Some(merge_scan) = extension - .node - .as_any() - .downcast_ref::() - { - scopes.insert(merge_scan.producer_scope_id()); - } + let mut producer_id_list = Vec::new(); + collect_merge_scan_remote_dyn_filter_producer_id_list(plan, &mut producer_id_list); + producer_ids.extend(producer_id_list); +} - for input in plan.inputs() { - collect_merge_scan_producer_scope_ids(input, scopes); +struct MergeScanRemoteDynFilterProducerIdCollector<'a> { + producer_ids: &'a mut Vec, +} + +impl TreeNodeRewriter for MergeScanRemoteDynFilterProducerIdCollector<'_> { + type Node = LogicalPlan; + + fn f_up(&mut self, node: Self::Node) -> DfResult> { + if let LogicalPlan::Extension(extension) = &node + && let Some(merge_scan) = extension + .node + .as_any() + .downcast_ref::() + { + self.producer_ids.push( + merge_scan + .remote_dyn_filter_producer_id() + .expect("MergeScan remote dynamic filter producer id must be assigned"), + ); + } + + Ok(Transformed::no(node)) } } -fn collect_merge_scan_producer_scope_id_list( +fn collect_merge_scan_remote_dyn_filter_producer_id_list( plan: &LogicalPlan, - scopes: &mut Vec, + producer_ids: &mut Vec, ) { - if let LogicalPlan::Extension(extension) = plan - && let Some(merge_scan) = extension - .node - .as_any() - .downcast_ref::() - { - scopes.push(merge_scan.producer_scope_id()); - } - - for input in plan.inputs() { - collect_merge_scan_producer_scope_id_list(input, scopes); - } + let _ = plan + .clone() + .rewrite_with_subqueries(&mut MergeScanRemoteDynFilterProducerIdCollector { producer_ids }) + .unwrap(); } pub(crate) struct TestTable; @@ -1399,7 +1407,7 @@ fn test_simplify_select_now_expression() { } #[test] -fn sibling_merge_scan_producers_have_unique_scope_ids() { +fn sibling_merge_scans_have_unique_remote_dyn_filter_producer_ids() { init_default_ut_logging(); let left_table = TestTable::table_with_name(0, "left_table".to_string()); let right_table = TestTable::table_with_name(1, "right_table".to_string()); @@ -1436,19 +1444,19 @@ fn sibling_merge_scan_producers_have_unique_scope_ids() { let config = ConfigOptions::default(); let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); - let mut scopes = Vec::new(); - collect_merge_scan_producer_scope_id_list(&result, &mut scopes); - let unique_scopes = scopes.iter().copied().collect::>(); + let mut producer_ids = Vec::new(); + collect_merge_scan_remote_dyn_filter_producer_id_list(&result, &mut producer_ids); + let unique_producer_ids = producer_ids.iter().copied().collect::>(); assert!( - scopes.len() >= 2, - "Expected at least 2 ProducerScopeIds, got {}: {scopes:?}", - scopes.len() + producer_ids.len() >= 2, + "Expected at least 2 RemoteDynFilterProducerIds, got {}: {producer_ids:?}", + producer_ids.len() ); assert_eq!( - scopes.len(), - unique_scopes.len(), - "Expected all sibling ProducerScopeIds to be unique, got scopes: {scopes:?}" + producer_ids.len(), + unique_producer_ids.len(), + "Expected all sibling RemoteDynFilterProducerIds to be unique, got ids: {producer_ids:?}" ); } @@ -1916,7 +1924,7 @@ fn transform_sort_subquery_alias() { } #[test] -fn producer_scope_ids_do_not_collide_between_subquery_and_outer_plan() { +fn remote_dyn_filter_producer_ids_do_not_collide_between_subquery_and_outer_plan() { let test_table = TestTable::table_with_name(0, "numbers".to_string()); let table_source = Arc::new(DefaultTableSource::new(Arc::new( DfTableProviderAdapter::new(test_table), @@ -1931,28 +1939,21 @@ fn producer_scope_ids_do_not_collide_between_subquery_and_outer_plan() { outer_ref_columns: Default::default(), spans: Default::default(), }; - let allocator = ProducerScopeAllocator::default(); - let rewritten_subquery = - DistPlannerAnalyzer::handle_subquery(subquery, allocator.clone()).unwrap(); let outer_plan = LogicalPlanBuilder::scan_with_filters("outer", table_source, None, vec![]) + .unwrap() + .filter(Expr::Exists(Exists { + subquery, + negated: false, + })) .unwrap() .build() .unwrap(); - let rewritten_outer = DistPlannerAnalyzer {} - .try_push_down(outer_plan, allocator) - .unwrap(); + let rewritten = DistPlannerAnalyzer {}.try_push_down(outer_plan).unwrap(); - let mut subquery_scopes = BTreeSet::new(); - collect_merge_scan_producer_scope_ids( - rewritten_subquery.subquery.as_ref(), - &mut subquery_scopes, - ); - let mut outer_scopes = BTreeSet::new(); - collect_merge_scan_producer_scope_ids(&rewritten_outer, &mut outer_scopes); + let mut producer_ids = BTreeSet::new(); + collect_merge_scan_remote_dyn_filter_producer_ids(&rewritten, &mut producer_ids); - assert_eq!(subquery_scopes.len(), 1); - assert_eq!(outer_scopes.len(), 1); - assert_ne!(subquery_scopes, outer_scopes); + assert_eq!(producer_ids.len(), 2); } #[test] diff --git a/src/query/src/dist_plan/dyn_filter_bridge.rs b/src/query/src/dist_plan/dyn_filter_bridge.rs index 9b6d74dd51..fe9ec39e3f 100644 --- a/src/query/src/dist_plan/dyn_filter_bridge.rs +++ b/src/query/src/dist_plan/dyn_filter_bridge.rs @@ -20,7 +20,6 @@ use common_query::request::{ INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, InitialDynFilterReg, InitialDynFilterRegs, InitialDynFilterSnapshot, }; -use datafusion::execution::TaskContext; use datafusion_common::Result; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; @@ -28,8 +27,7 @@ use session::context::{QueryContext, QueryContextRef}; use store_api::storage::RegionId; use crate::dist_plan::filter_id::build_remote_dyn_filter_id; -use crate::dist_plan::{FilterId, ProducerScopeId, QueryDynFilterRegistry, Subscriber}; -use crate::query_engine::QueryEngineState; +use crate::dist_plan::{FilterId, QueryDynFilterRegistry, RemoteDynFilterProducerId, Subscriber}; #[derive(Debug, Clone)] pub(crate) struct CapturedDynFilter { @@ -48,7 +46,7 @@ pub(crate) struct RemoteDynFilterPushdown { } pub(crate) fn capture_remote_dyn_filters_for_pushdown( - producer_scope_id: ProducerScopeId, + remote_dyn_filter_producer_id: RemoteDynFilterProducerId, parent_filters: Vec>, ) -> RemoteDynFilterPushdown { let mut pushed_down = Vec::with_capacity(parent_filters.len()); @@ -60,8 +58,11 @@ pub(crate) fn capture_remote_dyn_filters_for_pushdown( continue; }; - match build_captured_dyn_filter(producer_scope_id, producer_local_ordinal, alive_dyn_filter) - { + match build_captured_dyn_filter( + remote_dyn_filter_producer_id, + producer_local_ordinal, + alive_dyn_filter, + ) { Ok(captured_dyn_filter) => { pushed_down.push(true); captured_dyn_filters.push(captured_dyn_filter); @@ -106,10 +107,6 @@ fn downcast_dynamic_filter( .ok() } -fn query_engine_state_from_task_context(context: &TaskContext) -> Option> { - context.session_config().get_extension() -} - pub(crate) fn register_dyn_filters_for_region( registry: &QueryDynFilterRegistry, region_id: RegionId, @@ -125,29 +122,8 @@ pub(crate) fn register_dyn_filters_for_region( } } -pub(crate) fn bridge_dyn_filters_for_region( - context: &TaskContext, - query_ctx: &QueryContextRef, - region_id: RegionId, - captured_dyn_filters: &[CapturedDynFilter], -) { - if captured_dyn_filters.is_empty() { - return; - } - - let Some(query_engine_state) = query_engine_state_from_task_context(context) else { - return; - }; - let Some(registry) = query_engine_state.get_or_init_remote_dyn_filter_registry(query_ctx) - else { - return; - }; - - register_dyn_filters_for_region(®istry, region_id, captured_dyn_filters); -} - fn build_captured_dyn_filter( - producer_scope_id: ProducerScopeId, + remote_dyn_filter_producer_id: RemoteDynFilterProducerId, producer_local_ordinal: usize, alive_dyn_filter: Arc, ) -> Result { @@ -156,8 +132,11 @@ fn build_captured_dyn_filter( .into_iter() .cloned() .collect::>(); - let filter_id = - build_remote_dyn_filter_id(producer_scope_id, producer_local_ordinal, &children)?; + let filter_id = build_remote_dyn_filter_id( + remote_dyn_filter_producer_id, + producer_local_ordinal, + &children, + )?; let initial_registration = InitialDynFilterReg::from_filter_id_and_children(filter_id.to_string(), &children)?; @@ -295,6 +274,7 @@ mod tests { use std::fmt; use std::hash::{Hash, Hasher}; + use datafusion::execution::TaskContext; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; use datafusion_physical_expr::expressions::{Column, lit}; @@ -372,18 +352,18 @@ mod tests { QueryId::from(Uuid::from_u128(value)) } - fn test_producer_scope(value: u64) -> ProducerScopeId { - ProducerScopeId::new(value) + fn test_remote_dyn_filter_producer_id(value: u64) -> RemoteDynFilterProducerId { + RemoteDynFilterProducerId::new(value) } fn test_captured_dyn_filter( - producer_scope_id: ProducerScopeId, + remote_dyn_filter_producer_id: RemoteDynFilterProducerId, producer_local_ordinal: usize, column_name: &str, column_index: usize, ) -> CapturedDynFilter { build_captured_dyn_filter( - producer_scope_id, + remote_dyn_filter_producer_id, producer_local_ordinal, Arc::new(DynamicFilterPhysicalExpr::new( vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>], @@ -423,13 +403,20 @@ mod tests { )) as Arc, ]; - let producer_scope_id = test_producer_scope(42); - let captured = capture_remote_dyn_filters_for_pushdown(producer_scope_id, parent_filters) - .captured_dyn_filters; + let remote_dyn_filter_producer_id = test_remote_dyn_filter_producer_id(42); + let captured = + capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters) + .captured_dyn_filters; assert_eq!(captured.len(), 2); - assert_eq!(captured[0].filter_id.producer_scope_id(), producer_scope_id); - assert_eq!(captured[1].filter_id.producer_scope_id(), producer_scope_id); + assert_eq!( + captured[0].filter_id.remote_dyn_filter_producer_id(), + remote_dyn_filter_producer_id + ); + assert_eq!( + captured[1].filter_id.remote_dyn_filter_producer_id(), + remote_dyn_filter_producer_id + ); assert_eq!(captured[0].filter_id.producer_ordinal(), 1); assert_eq!(captured[1].filter_id.producer_ordinal(), 3); } @@ -445,16 +432,17 @@ mod tests { Arc::new(Column::new("zone", 2)) as Arc, ]; - let producer_scope_id = test_producer_scope(42); - let pushdown = capture_remote_dyn_filters_for_pushdown(producer_scope_id, parent_filters); + let remote_dyn_filter_producer_id = test_remote_dyn_filter_producer_id(42); + let pushdown = + capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters); assert_eq!(pushdown.pushed_down, vec![false, true, false]); assert_eq!(pushdown.captured_dyn_filters.len(), 1); assert_eq!( pushdown.captured_dyn_filters[0] .filter_id - .producer_scope_id(), - producer_scope_id + .remote_dyn_filter_producer_id(), + remote_dyn_filter_producer_id ); assert_eq!( pushdown.captured_dyn_filters[0] @@ -478,8 +466,10 @@ mod tests { )) as Arc]; - let pushdown = - capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + let pushdown = capture_remote_dyn_filters_for_pushdown( + test_remote_dyn_filter_producer_id(42), + parent_filters, + ); assert_eq!(pushdown.pushed_down, vec![false]); assert!(pushdown.captured_dyn_filters.is_empty()); @@ -493,8 +483,10 @@ mod tests { )) as Arc]; - let pushdown = - capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + let pushdown = capture_remote_dyn_filters_for_pushdown( + test_remote_dyn_filter_producer_id(42), + parent_filters, + ); assert_eq!(pushdown.pushed_down, vec![true]); assert!( @@ -514,8 +506,10 @@ mod tests { dyn_filter.update(lit(false) as _).unwrap(); let parent_filters = vec![dyn_filter as Arc]; - let pushdown = - capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + let pushdown = capture_remote_dyn_filters_for_pushdown( + test_remote_dyn_filter_producer_id(42), + parent_filters, + ); assert_eq!(pushdown.pushed_down, vec![true]); let snapshot = pushdown.captured_dyn_filters[0] @@ -540,8 +534,10 @@ mod tests { as Arc, ]; - let pushdown = - capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + let pushdown = capture_remote_dyn_filters_for_pushdown( + test_remote_dyn_filter_producer_id(42), + parent_filters, + ); assert_eq!(pushdown.pushed_down, vec![true, true]); assert_eq!(pushdown.captured_dyn_filters.len(), 2); @@ -564,8 +560,10 @@ mod tests { }) .collect::>(); - let pushdown = - capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + let pushdown = capture_remote_dyn_filters_for_pushdown( + test_remote_dyn_filter_producer_id(42), + parent_filters, + ); assert!(pushdown.captured_dyn_filters.is_empty()); assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]); @@ -584,8 +582,10 @@ mod tests { }) .collect::>(); - let pushdown = - capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters); + let pushdown = capture_remote_dyn_filters_for_pushdown( + test_remote_dyn_filter_producer_id(42), + parent_filters, + ); assert!(pushdown.captured_dyn_filters.is_empty()); assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]); @@ -595,7 +595,7 @@ mod tests { fn register_dyn_filters_for_region_reuses_existing_entry() { let registry = QueryDynFilterRegistry::new(test_query_id(1)); let captured_dyn_filters = vec![test_captured_dyn_filter( - test_producer_scope(42), + test_remote_dyn_filter_producer_id(42), 2, "host", 0, @@ -609,8 +609,8 @@ mod tests { assert_eq!(registry.entry_count(), 1); let entry = registry.entries().pop().unwrap(); assert_eq!( - entry.filter_id().producer_scope_id(), - test_producer_scope(42) + entry.filter_id().remote_dyn_filter_producer_id(), + test_remote_dyn_filter_producer_id(42) ); assert_eq!(entry.filter_id().producer_ordinal(), 2); let subscribers = entry.subscribers(); @@ -628,21 +628,22 @@ mod tests { } #[test] - fn register_dyn_filters_for_region_keeps_independent_producer_scopes_distinct() { + fn register_dyn_filters_for_region_keeps_independent_producer_ids_distinct() { let registry = QueryDynFilterRegistry::new(test_query_id(1)); let region_id = RegionId::new(1024, 7); - let make_filter = - |producer_scope_id| test_captured_dyn_filter(producer_scope_id, 2, "host", 0); + let make_filter = |remote_dyn_filter_producer_id| { + test_captured_dyn_filter(remote_dyn_filter_producer_id, 2, "host", 0) + }; register_dyn_filters_for_region( ®istry, region_id, - &[make_filter(test_producer_scope(42))], + &[make_filter(test_remote_dyn_filter_producer_id(42))], ); register_dyn_filters_for_region( ®istry, region_id, - &[make_filter(test_producer_scope(43))], + &[make_filter(test_remote_dyn_filter_producer_id(43))], ); assert_eq!(registry.entry_count(), 2); @@ -651,7 +652,7 @@ mod tests { #[test] fn query_context_includes_region_initial_dyn_filter_regs() { let captured_dyn_filters = vec![test_captured_dyn_filter( - test_producer_scope(42), + test_remote_dyn_filter_producer_id(42), 2, "host", 0, @@ -691,8 +692,8 @@ mod tests { #[test] fn query_context_drops_initial_regs_when_duplicate_filter_ids_exceed_bounds() { let captured_dyn_filters = vec![ - test_captured_dyn_filter(test_producer_scope(42), 2, "host", 0), - test_captured_dyn_filter(test_producer_scope(42), 2, "host", 0), + test_captured_dyn_filter(test_remote_dyn_filter_producer_id(42), 2, "host", 0), + test_captured_dyn_filter(test_remote_dyn_filter_producer_id(42), 2, "host", 0), ]; let region_id = RegionId::new(1024, 7); let query_ctx = QueryContext::arc(); diff --git a/src/query/src/dist_plan/filter_id.rs b/src/query/src/dist_plan/filter_id.rs index b16fe48ac4..ba22907cbf 100644 --- a/src/query/src/dist_plan/filter_id.rs +++ b/src/query/src/dist_plan/filter_id.rs @@ -51,10 +51,15 @@ impl FromStr for FilterFingerprint { } } +/// Query-local identity for one remote dynamic filter producer. +/// +/// The analyzer assigns a new id to each frontend-side `MergeScan` rewrite so filters from +/// independent producers cannot collide even when their producer-local ordinals and child +/// fingerprints match. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct ProducerScopeId(u64); +pub struct RemoteDynFilterProducerId(u64); -impl ProducerScopeId { +impl RemoteDynFilterProducerId { pub fn new(value: u64) -> Self { Self(value) } @@ -64,13 +69,13 @@ impl ProducerScopeId { } } -impl Display for ProducerScopeId { +impl Display for RemoteDynFilterProducerId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{:016x}", self.0) } } -impl FromStr for ProducerScopeId { +impl FromStr for RemoteDynFilterProducerId { type Err = ParseIntError; fn from_str(s: &str) -> Result { @@ -80,7 +85,7 @@ impl FromStr for ProducerScopeId { #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct FilterId { - producer_scope_id: ProducerScopeId, + remote_dyn_filter_producer_id: RemoteDynFilterProducerId, producer_ordinal: u32, children_fingerprint: FilterFingerprint, } @@ -91,12 +96,12 @@ pub struct FilterId { impl FilterId { pub fn new( - producer_scope_id: ProducerScopeId, + remote_dyn_filter_producer_id: RemoteDynFilterProducerId, producer_ordinal: u32, children_fingerprint: FilterFingerprint, ) -> Self { Self { - producer_scope_id, + remote_dyn_filter_producer_id, producer_ordinal, children_fingerprint, } @@ -106,8 +111,8 @@ impl FilterId { self.producer_ordinal } - pub fn producer_scope_id(&self) -> ProducerScopeId { - self.producer_scope_id + pub fn remote_dyn_filter_producer_id(&self) -> RemoteDynFilterProducerId { + self.remote_dyn_filter_producer_id } pub fn children_fingerprint(&self) -> FilterFingerprint { @@ -120,7 +125,7 @@ impl Display for FilterId { write!( f, "{}:{}:{}", - self.producer_scope_id, self.producer_ordinal, self.children_fingerprint + self.remote_dyn_filter_producer_id, self.producer_ordinal, self.children_fingerprint ) } } @@ -130,10 +135,10 @@ impl FromStr for FilterId { fn from_str(s: &str) -> Result { let mut parts = s.split(':'); - let producer_scope_id = parts + let remote_dyn_filter_producer_id = parts .next() .ok_or(ParseFilterIdError)? - .parse::() + .parse::() .map_err(|_| ParseFilterIdError)?; let producer_local_ordinal = parts .next() @@ -150,7 +155,7 @@ impl FromStr for FilterId { } Ok(Self::new( - producer_scope_id, + remote_dyn_filter_producer_id, producer_local_ordinal, children_fingerprint, )) @@ -168,15 +173,12 @@ impl Display for ParseFilterIdError { /// Builds the query-local remote dynamic filter identity. /// -/// The identity is `producer scope + producer-local ordinal + canonicalized child fingerprint`. +/// The identity is `remote dynamic filter producer id + producer-local ordinal + canonicalized child fingerprint`. /// Subscriber routing details such as `region_id` and `partition` stay outside this key so they /// can remain in the later fanout/subscriber map instead of splitting one shared remote filter state. -/// -/// NOTE(remote-dyn-filter): This id is generated once on the source side and then propagated. -/// Consumers should reuse the propagated `FilterId` instead of independently recomputing it from -/// local state. +/// See [`FilterId`] for the propagation contract. pub(crate) fn build_remote_dyn_filter_id( - producer_scope_id: ProducerScopeId, + remote_dyn_filter_producer_id: RemoteDynFilterProducerId, producer_local_ordinal: usize, children: &[Arc], ) -> Result { @@ -186,7 +188,7 @@ pub(crate) fn build_remote_dyn_filter_id( DataFusionError::Execution("producer ordinal out of range for filter id".to_string()) })?; Ok(FilterId::new( - producer_scope_id, + remote_dyn_filter_producer_id, producer_local_ordinal, children_fingerprint, )) @@ -228,7 +230,11 @@ mod tests { #[test] fn filter_id_round_trips_through_string() { - let filter_id = FilterId::new(ProducerScopeId::new(42), 3, FilterFingerprint::new(0xabc)); + let filter_id = FilterId::new( + RemoteDynFilterProducerId::new(42), + 3, + FilterFingerprint::new(0xabc), + ); let encoded = filter_id.to_string(); assert_eq!(encoded, "000000000000002a:3:0000000000000abc"); @@ -249,37 +255,48 @@ mod tests { #[test] fn remote_dyn_filter_id_is_stable_for_equivalent_children() { - let producer_scope_id = ProducerScopeId::new(42); - let first = - build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["host", "pod"])) - .unwrap(); - let second = - build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["host", "pod"])) - .unwrap(); + let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42); + let first = build_remote_dyn_filter_id( + remote_dyn_filter_producer_id, + 3, + &test_children(&["host", "pod"]), + ) + .unwrap(); + let second = build_remote_dyn_filter_id( + remote_dyn_filter_producer_id, + 3, + &test_children(&["host", "pod"]), + ) + .unwrap(); assert_eq!(first, second); } #[test] - fn remote_dyn_filter_id_changes_when_producer_scope_changes() { + fn remote_dyn_filter_id_changes_when_remote_dyn_filter_producer_changes() { let children = test_children(&["host", "pod"]); - let baseline = build_remote_dyn_filter_id(ProducerScopeId::new(42), 3, &children).unwrap(); - let different_scope = - build_remote_dyn_filter_id(ProducerScopeId::new(43), 3, &children).unwrap(); + let baseline = + build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(42), 3, &children).unwrap(); + let different_producer_id = + build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(43), 3, &children).unwrap(); - assert_ne!(baseline, different_scope); + assert_ne!(baseline, different_producer_id); } #[test] fn remote_dyn_filter_id_changes_when_identity_inputs_change() { let children = test_children(&["host", "pod"]); - let producer_scope_id = ProducerScopeId::new(42); - let baseline = build_remote_dyn_filter_id(producer_scope_id, 3, &children).unwrap(); + let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42); + let baseline = + build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 3, &children).unwrap(); let different_ordinal = - build_remote_dyn_filter_id(producer_scope_id, 4, &children).unwrap(); - let different_children = - build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["pod", "host"])) - .unwrap(); + build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 4, &children).unwrap(); + let different_children = build_remote_dyn_filter_id( + remote_dyn_filter_producer_id, + 3, + &test_children(&["pod", "host"]), + ) + .unwrap(); assert_ne!(baseline, different_ordinal); assert_ne!(baseline, different_children); @@ -287,19 +304,22 @@ mod tests { #[test] fn remote_dyn_filter_id_supports_empty_children() { - let producer_scope_id = ProducerScopeId::new(42); - let first = build_remote_dyn_filter_id(producer_scope_id, 1, &[]).unwrap(); - let second = build_remote_dyn_filter_id(producer_scope_id, 1, &[]).unwrap(); + let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42); + let first = build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 1, &[]).unwrap(); + let second = build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 1, &[]).unwrap(); assert_eq!(first, second); - assert_eq!(first.producer_scope_id(), producer_scope_id); + assert_eq!( + first.remote_dyn_filter_producer_id(), + remote_dyn_filter_producer_id + ); assert_eq!(first.producer_ordinal(), 1); assert_eq!(first.children_fingerprint(), second.children_fingerprint()); } #[test] fn remote_dyn_filter_id_rejects_out_of_range_producer_ordinal() { - let error = build_remote_dyn_filter_id(ProducerScopeId::new(42), usize::MAX, &[]) + let error = build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(42), usize::MAX, &[]) .unwrap_err() .to_string(); diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index c148f7baff..9d0526c769 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -54,10 +54,10 @@ use tracing::{Instrument, Span}; use crate::dist_plan::analyzer::AliasMapping; use crate::dist_plan::analyzer::utils::patch_batch_timezone; use crate::dist_plan::dyn_filter_bridge::{ - CapturedDynFilter, bridge_dyn_filters_for_region, capture_remote_dyn_filters_for_pushdown, - query_context_with_initial_dyn_filter_regs, + CapturedDynFilter, capture_remote_dyn_filters_for_pushdown, + query_context_with_initial_dyn_filter_regs, register_dyn_filters_for_region, }; -use crate::dist_plan::{ProducerScopeId, RemoteDynFilterRegistryLease}; +use crate::dist_plan::{RemoteDynFilterProducerId, RemoteDynFilterRegistryLease}; use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS}; use crate::options::FlowQueryExtensions; use crate::query_engine::QueryEngineState; @@ -67,7 +67,7 @@ fn query_engine_state_from_task_context(context: &TaskContext) -> Option, } impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan { @@ -133,20 +134,23 @@ impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan { } impl MergeScanLogicalPlan { - pub fn new( - input: LogicalPlan, - is_placeholder: bool, - partition_cols: AliasMapping, - producer_scope_id: ProducerScopeId, - ) -> Self { + pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self { Self { input, is_placeholder, partition_cols, - producer_scope_id, + remote_dyn_filter_producer_id: None, } } + pub(crate) fn with_remote_dyn_filter_producer_id( + mut self, + remote_dyn_filter_producer_id: RemoteDynFilterProducerId, + ) -> Self { + self.remote_dyn_filter_producer_id = Some(remote_dyn_filter_producer_id); + self + } + pub fn name() -> &'static str { "MergeScan" } @@ -170,8 +174,8 @@ impl MergeScanLogicalPlan { &self.partition_cols } - pub fn producer_scope_id(&self) -> ProducerScopeId { - self.producer_scope_id + pub fn remote_dyn_filter_producer_id(&self) -> Option { + self.remote_dyn_filter_producer_id } } @@ -189,7 +193,12 @@ pub struct MergeScanExec { /// Metrics for each partition partition_metrics: Arc>>, query_ctx: QueryContextRef, - remote_dyn_filter_producer_scope_id: ProducerScopeId, + /// Optional because remote dynamic filters must fail open. + /// + /// If producer id assignment is missing, `MergeScanExec` still executes the query normally and + /// only skips RDF pushdown/registration while keeping parent-side filters as the correctness + /// fallback. + remote_dyn_filter_producer_id: Option, captured_remote_dyn_filters: Arc>>, target_partition: usize, partition_cols: AliasMapping, @@ -217,7 +226,7 @@ impl MergeScanExec { query_ctx: QueryContextRef, target_partition: usize, partition_cols: AliasMapping, - producer_scope_id: ProducerScopeId, + remote_dyn_filter_producer_id: Option, ) -> Result { // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs. @@ -290,7 +299,7 @@ impl MergeScanExec { partition_metrics: Arc::default(), properties, query_ctx, - remote_dyn_filter_producer_scope_id: producer_scope_id, + remote_dyn_filter_producer_id, captured_remote_dyn_filters: Arc::default(), target_partition, partition_cols, @@ -318,14 +327,14 @@ impl MergeScanExec { let current_channel = self.query_ctx.channel(); let read_preference = self.query_ctx.read_preference(); let explain_verbose = self.query_ctx.explain_verbose(); - let remote_dyn_filter_registry_cleanup = acquire_remote_dyn_filter_registry_cleanup( + let remote_dyn_filter_registry_lease = acquire_remote_dyn_filter_registry_lease( context.as_ref(), &query_ctx, &captured_remote_dyn_filters, ); let stream = Box::pin(stream!({ - let _remote_dyn_filter_registry_cleanup = remote_dyn_filter_registry_cleanup; + let remote_dyn_filter_registry_lease = remote_dyn_filter_registry_lease; // only report metrics once for each MergeScan if partition == 0 { MERGE_SCAN_REGIONS.observe(regions.len() as f64); @@ -341,12 +350,15 @@ impl MergeScanExec { .step_by(target_partition) .copied() { - bridge_dyn_filters_for_region( - context.as_ref(), - &query_ctx, - region_id, - &captured_remote_dyn_filters, - ); + if let Some(remote_dyn_filter_registry_lease) = + remote_dyn_filter_registry_lease.as_ref() + { + register_dyn_filters_for_region( + remote_dyn_filter_registry_lease.registry(), + region_id, + &captured_remote_dyn_filters, + ); + } let region_span = tracing_context.attach(tracing::info_span!( parent: &Span::current(), @@ -531,7 +543,7 @@ impl MergeScanExec { sub_stage_metrics: self.sub_stage_metrics.clone(), partition_metrics: self.partition_metrics.clone(), query_ctx: self.query_ctx.clone(), - remote_dyn_filter_producer_scope_id: self.remote_dyn_filter_producer_scope_id, + remote_dyn_filter_producer_id: self.remote_dyn_filter_producer_id, captured_remote_dyn_filters: self.captured_remote_dyn_filters.clone(), target_partition: self.target_partition, partition_cols: self.partition_cols.clone(), @@ -592,8 +604,8 @@ impl MergeScanExec { #[cfg(test)] impl MergeScanExec { - fn remote_dyn_filter_producer_scope_id(&self) -> ProducerScopeId { - self.remote_dyn_filter_producer_scope_id + fn remote_dyn_filter_producer_id(&self) -> Option { + self.remote_dyn_filter_producer_id } } @@ -715,10 +727,22 @@ impl ExecutionPlan for MergeScanExec { .into_iter() .map(|filter| filter.filter) .collect::>(); - let remote_dyn_filter_pushdown = capture_remote_dyn_filters_for_pushdown( - self.remote_dyn_filter_producer_scope_id, - parent_filters, - ); + let Some(remote_dyn_filter_producer_id) = self.remote_dyn_filter_producer_id else { + // RDF is an optimization: missing producer identity must only disable RDF, never block + // normal query execution or remove the parent-side filter fallback. + common_telemetry::warn!( + "MergeScan remote dynamic filter producer id is not assigned; skipping remote dynamic filter pushdown" + ); + self.captured_remote_dyn_filters.lock().unwrap().clear(); + let new_self = Arc::new(self.clone()); + + return Ok(FilterPushdownPropagation { + filters: parent_filters.into_iter().map(|_| PushedDown::No).collect(), + updated_node: Some(new_self), + }); + }; + let remote_dyn_filter_pushdown = + capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters); *self.captured_remote_dyn_filters.lock().unwrap() = remote_dyn_filter_pushdown.captured_dyn_filters; let new_self = Arc::new(self.clone()); @@ -881,7 +905,6 @@ mod tests { let registry_manager = Arc::new(DynFilterRegistryManager::default()); let query_id = test_query_id(1); - registry_manager.get_or_init(query_id); let first = registry_manager.acquire_lease(query_id); let second = registry_manager.acquire_lease(query_id); @@ -939,8 +962,8 @@ mod tests { } #[test] - fn try_with_new_distribution_preserves_producer_scope_id() { - let producer_scope_id = ProducerScopeId::new(42); + fn try_with_new_distribution_preserves_remote_dyn_filter_producer_id() { + let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42); // Build a plan whose schema contains "col1" let plan = LogicalPlanBuilder::empty(true) @@ -976,13 +999,13 @@ mod tests { query_ctx, target_partition, partition_cols, - producer_scope_id, + Some(remote_dyn_filter_producer_id), ) .unwrap(); assert_eq!( - exec.remote_dyn_filter_producer_scope_id(), - producer_scope_id + exec.remote_dyn_filter_producer_id(), + Some(remote_dyn_filter_producer_id) ); // A distribution that differs from the current partitioning but shares a @@ -998,15 +1021,15 @@ mod tests { .expect("expected a cloned exec with overlapping partition col"); assert_eq!( - cloned.remote_dyn_filter_producer_scope_id(), - producer_scope_id, - "try_with_new_distribution must preserve producer scope id" + cloned.remote_dyn_filter_producer_id(), + Some(remote_dyn_filter_producer_id), + "try_with_new_distribution must preserve remote dynamic filter producer id" ); } #[test] fn remote_dyn_filter_preflight_keeps_parent_filter_until_dn_runtime_is_ready() { - let producer_scope_id = ProducerScopeId::new(42); + let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42); let plan = LogicalPlanBuilder::empty(true) .project(vec![lit(1i32).alias("col1")]) .unwrap() @@ -1029,7 +1052,7 @@ mod tests { query_ctx, 1, AliasMapping::new(), - producer_scope_id, + Some(remote_dyn_filter_producer_id), ) .unwrap(); let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new( diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 52cf146aad..e6776b7039 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -178,7 +178,7 @@ impl ExtensionPlanner for DistExtensionPlanner { query_ctx, session_state.config().target_partitions(), merge_scan.partition_cols().clone(), - merge_scan.producer_scope_id(), + merge_scan.remote_dyn_filter_producer_id(), )?; Ok(Some(Arc::new(merge_scan_plan) as _)) } diff --git a/src/query/src/dist_plan/remote_dyn_filter_registry.rs b/src/query/src/dist_plan/remote_dyn_filter_registry.rs index 694763a59c..4108c03c11 100644 --- a/src/query/src/dist_plan/remote_dyn_filter_registry.rs +++ b/src/query/src/dist_plan/remote_dyn_filter_registry.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock, Weak}; use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; @@ -23,7 +22,7 @@ use store_api::storage::RegionId; use crate::dist_plan::FilterId; /// Routing metadata for a remote dynamic filter subscriber. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Subscriber { region_id: RegionId, } @@ -42,6 +41,7 @@ impl Subscriber { #[derive(Debug, Clone)] pub enum EntryRegistration { Inserted(Arc), + /// The filter already existed; this contains the previously registered entry. Existing(Arc), } @@ -60,7 +60,7 @@ pub enum SubscriberRegistration { pub struct DynFilterEntry { filter_id: FilterId, alive_dyn_filter: Weak, - subscribers: RwLock>, + subscribers: RwLock>, } #[derive(Debug)] @@ -73,7 +73,7 @@ impl DynFilterEntry { Self { filter_id, alive_dyn_filter: Arc::downgrade(&alive_dyn_filter), - subscribers: RwLock::new(Vec::new()), + subscribers: RwLock::new(HashSet::new()), } } @@ -86,17 +86,12 @@ impl DynFilterEntry { } pub fn subscribers(&self) -> Vec { - self.subscribers.read().unwrap().clone() + self.subscribers.read().unwrap().iter().cloned().collect() } pub fn register_subscriber(&self, subscriber: Subscriber) -> bool { let mut subscribers = self.subscribers.write().unwrap(); - if subscribers.contains(&subscriber) { - return false; - } - - subscribers.push(subscriber); - true + subscribers.insert(subscriber) } } @@ -104,7 +99,6 @@ impl DynFilterEntry { #[derive(Debug)] pub struct QueryDynFilterRegistry { query_id: QueryId, - active_streams: AtomicUsize, inner: RwLock, } @@ -112,7 +106,6 @@ impl QueryDynFilterRegistry { pub fn new(query_id: QueryId) -> Self { Self { query_id, - active_streams: AtomicUsize::new(0), inner: RwLock::new(QueryDynFilterRegistryInner { entries: HashMap::new(), }), @@ -123,18 +116,6 @@ impl QueryDynFilterRegistry { self.query_id } - fn acquire_stream(&self) { - self.active_streams.fetch_add(1, Ordering::SeqCst); - } - - fn release_stream(&self) { - self.active_streams.fetch_sub(1, Ordering::SeqCst); - } - - fn active_stream_count(&self) -> usize { - self.active_streams.load(Ordering::SeqCst) - } - pub fn entry_count(&self) -> usize { self.inner.read().unwrap().entries.len() } @@ -185,10 +166,20 @@ impl QueryDynFilterRegistry { } } +/// Stream-scoped lease that keeps a query registry alive. +/// +/// The manager only stores a weak index. Holding this lease is the production path for owning a +/// strong registry reference; this keeps cleanup tied to stream lifetime instead of a hand-rolled +/// active-stream counter. #[derive(Debug)] pub struct RemoteDynFilterRegistryLease { registry_manager: Arc, - registry: Arc, + /// Always `Some` while the lease is alive. + /// + /// This is wrapped in `Option` only so `Drop` can `take()` and release the strong `Arc` before + /// checking whether the manager's weak index became dead. Without this explicit drop order, + /// the field would remain alive until after `Drop::drop` returns. + registry: Option>, } impl RemoteDynFilterRegistryLease { @@ -196,80 +187,167 @@ impl RemoteDynFilterRegistryLease { registry_manager: Arc, registry: Arc, ) -> Self { - registry.acquire_stream(); Self { registry_manager, - registry, + registry: Some(registry), } } + + pub fn registry(&self) -> &QueryDynFilterRegistry { + self.registry + .as_deref() + .expect("remote dyn filter registry lease must hold a registry") + } + + #[cfg(test)] + pub(crate) fn ptr_eq(&self, other: &Self) -> bool { + Arc::ptr_eq( + self.registry.as_ref().unwrap(), + other.registry.as_ref().unwrap(), + ) + } } impl Drop for RemoteDynFilterRegistryLease { fn drop(&mut self) { - self.registry.release_stream(); + let Some(registry) = self.registry.take() else { + return; + }; + let query_id = registry.query_id(); + let registry_weak = Arc::downgrade(®istry); + + // Release this lease's strong reference before checking whether the manager's weak entry is + // dead. If two leases drop concurrently, checking `Arc::strong_count` before the fields are + // dropped can make both drops observe each other and both skip cleanup. + drop(registry); + let _ = self .registry_manager - .remove_if_inactive(&self.registry.query_id(), &self.registry); + .remove_if_dropped_registry(&query_id, ®istry_weak); } } /// Query-engine manager for query-scoped remote dynamic filter registries. +/// +/// This is an index, not an owner: the map stores `Weak` pointers and active streams own the +/// registry through [`RemoteDynFilterRegistryLease`]. Keep production access lease-based so +/// dropping a lease releases one strong owner before the manager prunes a dead weak entry. #[derive(Debug, Default)] pub struct DynFilterRegistryManager { - registries: RwLock>>, + registries: RwLock>>, } impl DynFilterRegistryManager { - pub fn get(&self, query_id: &QueryId) -> Option> { - self.registries.read().unwrap().get(query_id).cloned() + #[cfg(test)] + fn get(&self, query_id: &QueryId) -> Option> { + let (registry, stale_entry) = { + let registries = self.registries.read().unwrap(); + let registry = registries.get(query_id)?; + + (registry.upgrade(), registry.clone()) + }; + + if registry.is_none() { + self.remove_stale_entry(query_id, &stale_entry); + } + + registry } #[cfg(test)] - fn remove(&self, query_id: &QueryId) -> Option> { + fn remove(&self, query_id: &QueryId) -> Option> { self.registries.write().unwrap().remove(query_id) } - fn remove_if_inactive( + fn remove_if_dropped_registry( &self, query_id: &QueryId, - registry: &Arc, - ) -> Option> { - let mut registries = self.registries.write().unwrap(); + dropped_registry: &Weak, + ) -> Option> { + let mut registries = self + .registries + .write() + .unwrap_or_else(|poisoned| poisoned.into_inner()); let current = registries.get(query_id)?; - if Arc::ptr_eq(current, registry) && registry.active_stream_count() == 0 { + // Compare `Weak` handles rather than raw pointers. The weak control block stays alive while + // either this drop's local weak handle or the manager's weak entry exists. `ptr_eq` prevents + // an old lease drop from removing a different registry installed for the same query id, and + // `upgrade().is_none()` ensures we only prune dead entries. + if current.ptr_eq(dropped_registry) && current.upgrade().is_none() { registries.remove(query_id) } else { None } } + #[cfg(test)] + fn remove_stale_entry( + &self, + query_id: &QueryId, + stale_registry: &Weak, + ) { + let mut registries = self.registries.write().unwrap(); + let Some(current) = registries.get(query_id) else { + return; + }; + + if current.ptr_eq(stale_registry) && current.upgrade().is_none() { + registries.remove(query_id); + } + } + + /// Acquires the stream-owned registry lease for `query_id`. + /// + /// This is the only production API that returns a live registry handle. A concurrent final + /// lease drop cannot remove the map entry between lookup and ownership transfer because the + /// upgraded `Arc` returned by `get_or_init` is already a strong owner before the manager lock is + /// released. pub fn acquire_lease(self: &Arc, query_id: QueryId) -> RemoteDynFilterRegistryLease { let registry = self.get_or_init(query_id); RemoteDynFilterRegistryLease::new(self.clone(), registry) } - pub fn get_or_init(&self, query_id: QueryId) -> Arc { + fn get_or_init(&self, query_id: QueryId) -> Arc { let mut registries = self.registries.write().unwrap(); - registries - .entry(query_id) - .or_insert_with(|| Arc::new(QueryDynFilterRegistry::new(query_id))) - .clone() + if let Some(registry) = registries.get(&query_id).and_then(Weak::upgrade) { + return registry; + } + + let registry = Arc::new(QueryDynFilterRegistry::new(query_id)); + registries.insert(query_id, Arc::downgrade(®istry)); + registry } + #[cfg(test)] pub fn registry_count(&self) -> usize { + // Snapshot helper for tests. Lifecycle decisions must not depend on this count; cleanup uses + // the lease-owned `Arc` and weak-entry pruning instead. + self.registries + .read() + .unwrap() + .values() + .filter(|registry| registry.strong_count() > 0) + .count() + } + + #[cfg(test)] + fn weak_entry_count(&self) -> usize { self.registries.read().unwrap().len() } } #[cfg(test)] mod tests { + use std::sync::Barrier; + use std::thread; + use datafusion_physical_expr::expressions::{Column, lit}; use uuid::Uuid; use super::*; - use crate::dist_plan::{FilterFingerprint, ProducerScopeId}; + use crate::dist_plan::{FilterFingerprint, RemoteDynFilterProducerId}; fn test_query_id(value: u128) -> QueryId { QueryId::from(Uuid::from_u128(value)) @@ -277,7 +355,7 @@ mod tests { fn test_filter_id(producer_ordinal: u32) -> FilterId { FilterId::new( - ProducerScopeId::new(42), + RemoteDynFilterProducerId::new(42), producer_ordinal, FilterFingerprint::new(0xabc), ) @@ -295,25 +373,32 @@ mod tests { #[test] fn registry_manager_returns_same_registry_for_same_query() { - let manager = DynFilterRegistryManager::default(); + let manager = Arc::new(DynFilterRegistryManager::default()); let query_id = test_query_id(1); - let first = manager.get_or_init(query_id); - let second = manager.get_or_init(query_id); + let first = manager.acquire_lease(query_id); + let second = manager.acquire_lease(query_id); - assert!(Arc::ptr_eq(&first, &second)); + assert!(first.ptr_eq(&second)); assert_eq!(manager.registry_count(), 1); + assert_eq!(manager.weak_entry_count(), 1); } #[test] fn registry_manager_removes_registry_for_query() { - let manager = DynFilterRegistryManager::default(); + let manager = Arc::new(DynFilterRegistryManager::default()); let query_id = test_query_id(1); - let registry = manager.get_or_init(query_id); + let lease = manager.acquire_lease(query_id); - assert!(Arc::ptr_eq(&manager.remove(&query_id).unwrap(), ®istry)); + assert!( + manager + .remove(&query_id) + .unwrap() + .ptr_eq(&Arc::downgrade(lease.registry.as_ref().unwrap())) + ); assert!(manager.get(&query_id).is_none()); assert_eq!(manager.registry_count(), 0); + assert_eq!(manager.weak_entry_count(), 0); } #[test] @@ -324,12 +409,16 @@ mod tests { let first = manager.acquire_lease(query_id); let second = manager.acquire_lease(query_id); + assert!(first.ptr_eq(&second)); assert_eq!(manager.registry_count(), 1); + assert_eq!(manager.weak_entry_count(), 1); drop(first); assert_eq!(manager.registry_count(), 1); + assert_eq!(manager.weak_entry_count(), 1); drop(second); assert_eq!(manager.registry_count(), 0); + assert_eq!(manager.weak_entry_count(), 0); } #[test] @@ -339,11 +428,141 @@ mod tests { let first = manager.acquire_lease(query_id); drop(first); + assert_eq!(manager.registry_count(), 0); + assert_eq!(manager.weak_entry_count(), 0); + let second = manager.acquire_lease(query_id); assert_eq!(manager.registry_count(), 1); + assert_eq!(manager.weak_entry_count(), 1); drop(second); assert_eq!(manager.registry_count(), 0); + assert_eq!(manager.weak_entry_count(), 0); + } + + #[test] + fn registry_manager_concurrent_final_lease_drop_cleans_weak_entry() { + let manager = Arc::new(DynFilterRegistryManager::default()); + let query_id = test_query_id(1); + let first = manager.acquire_lease(query_id); + let second = manager.acquire_lease(query_id); + let barrier = Arc::new(Barrier::new(3)); + + let first_barrier = barrier.clone(); + let first_drop = thread::spawn(move || { + first_barrier.wait(); + drop(first); + }); + + let second_barrier = barrier.clone(); + let second_drop = thread::spawn(move || { + second_barrier.wait(); + drop(second); + }); + + barrier.wait(); + first_drop.join().unwrap(); + second_drop.join().unwrap(); + + assert_eq!(manager.registry_count(), 0); + assert_eq!(manager.weak_entry_count(), 0); + } + + #[test] + fn registry_manager_concurrent_first_acquire_shares_registry() { + let manager = Arc::new(DynFilterRegistryManager::default()); + let query_id = test_query_id(1); + let worker_count = 8; + let barrier = Arc::new(Barrier::new(worker_count + 1)); + + let handles = (0..worker_count) + .map(|_| { + let manager = manager.clone(); + let barrier = barrier.clone(); + thread::spawn(move || { + barrier.wait(); + manager.acquire_lease(query_id) + }) + }) + .collect::>(); + + barrier.wait(); + let leases = handles + .into_iter() + .map(|handle| handle.join().unwrap()) + .collect::>(); + + let first = leases.first().unwrap(); + assert!(leases.iter().all(|lease| first.ptr_eq(lease))); + assert_eq!(manager.registry_count(), 1); + assert_eq!(manager.weak_entry_count(), 1); + + drop(leases); + assert_eq!(manager.registry_count(), 0); + assert_eq!(manager.weak_entry_count(), 0); + } + + #[test] + fn registry_manager_drop_racing_acquire_does_not_leave_stale_entry() { + let manager = Arc::new(DynFilterRegistryManager::default()); + let query_id = test_query_id(1); + + for _ in 0..64 { + let old_lease = manager.acquire_lease(query_id); + let barrier = Arc::new(Barrier::new(3)); + + let drop_barrier = barrier.clone(); + let drop_thread = thread::spawn(move || { + drop_barrier.wait(); + drop(old_lease); + }); + + let acquire_manager = manager.clone(); + let acquire_barrier = barrier.clone(); + let acquire_thread = thread::spawn(move || { + acquire_barrier.wait(); + acquire_manager.acquire_lease(query_id) + }); + + barrier.wait(); + drop_thread.join().unwrap(); + let new_lease = acquire_thread.join().unwrap(); + + assert_eq!(manager.registry_count(), 1); + assert_eq!(manager.weak_entry_count(), 1); + drop(new_lease); + assert_eq!(manager.registry_count(), 0); + assert_eq!(manager.weak_entry_count(), 0); + } + } + + #[test] + fn registry_manager_old_drop_cannot_remove_replacement_registry() { + let manager = Arc::new(DynFilterRegistryManager::default()); + let query_id = test_query_id(1); + let old_lease = manager.acquire_lease(query_id); + let old_registry = Arc::downgrade(old_lease.registry.as_ref().unwrap()); + + drop(old_lease); + assert_eq!(manager.registry_count(), 0); + assert_eq!(manager.weak_entry_count(), 0); + + let replacement_lease = manager.acquire_lease(query_id); + assert_eq!(manager.registry_count(), 1); + assert_eq!(manager.weak_entry_count(), 1); + + assert!( + manager + .remove_if_dropped_registry(&query_id, &old_registry) + .is_none(), + "old registry cleanup must not remove the replacement weak entry" + ); + assert_eq!(manager.registry_count(), 1); + assert_eq!(manager.weak_entry_count(), 1); + + drop(replacement_lease); + assert_eq!(manager.registry_count(), 0); + assert_eq!(manager.weak_entry_count(), 0); } #[test] diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index feea72e34d..b1483faea2 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -395,7 +395,7 @@ mod tests { use table::table_name::TableName; use super::*; - use crate::dist_plan::ProducerScopeId; + use crate::dist_plan::RemoteDynFilterProducerId; use crate::options::{FLOW_RETURN_REGION_SEQ, FLOW_SINK_TABLE_ID}; use crate::region_query::RegionQueryHandler; @@ -459,7 +459,7 @@ mod tests { query_ctx, 1, BTreeMap::>::new(), - ProducerScopeId::new(0), + Some(RemoteDynFilterProducerId::new(0)), ) .unwrap(), ) diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index ca1ff0628c..e9cd5f028d 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -154,7 +154,7 @@ mod tests { use table::table_name::TableName; use super::*; - use crate::dist_plan::ProducerScopeId; + use crate::dist_plan::RemoteDynFilterProducerId; use crate::error::Result as QueryResult; use crate::region_query::RegionQueryHandler; @@ -292,7 +292,7 @@ mod tests { QueryContext::arc(), 32, partition_cols, - ProducerScopeId::new(1), + Some(RemoteDynFilterProducerId::new(1)), ) .unwrap() } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index fee2f6f2d8..ad2ef92bb9 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -57,7 +57,7 @@ use table::table::adapter::DfTableProviderAdapter; use crate::QueryEngineContext; use crate::dist_plan::{ DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, DynFilterRegistryManager, - MergeSortExtensionPlanner, QueryDynFilterRegistry, + MergeSortExtensionPlanner, RemoteDynFilterRegistryLease, }; use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES}; use crate::optimizer::ExtensionAnalyzerRule; @@ -404,12 +404,16 @@ impl QueryEngineState { self.dyn_filter_registry_manager.clone() } - pub fn get_or_init_remote_dyn_filter_registry( + pub fn acquire_remote_dyn_filter_registry_lease( &self, query_ctx: &QueryContextRef, - ) -> Option> { + ) -> Option { let query_id = query_ctx.remote_query_id_value()?; - Some(self.dyn_filter_registry_manager.get_or_init(query_id)) + Some( + self.dyn_filter_registry_manager + .clone() + .acquire_lease(query_id), + ) } pub fn function_state(&self) -> Arc { @@ -617,20 +621,23 @@ mod tests { } #[test] - fn query_engine_state_reuses_query_scoped_dyn_filter_registry() { + fn query_engine_state_reuses_query_scoped_dyn_filter_registry_lease() { let state = new_query_engine_state(); let query_ctx = QueryContext::arc(); let first = state - .get_or_init_remote_dyn_filter_registry(&query_ctx) + .acquire_remote_dyn_filter_registry_lease(&query_ctx) .unwrap(); let second = state - .get_or_init_remote_dyn_filter_registry(&query_ctx) + .acquire_remote_dyn_filter_registry_lease(&query_ctx) .unwrap(); - assert!(Arc::ptr_eq(&first, &second)); + assert!(first.ptr_eq(&second)); assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1); - assert_eq!(first.query_id(), query_ctx.remote_query_id_value().unwrap()); + assert_eq!( + first.registry().query_id(), + query_ctx.remote_query_id_value().unwrap() + ); } #[test] @@ -640,12 +647,12 @@ mod tests { assert!(query_ctx.remote_query_id_value().is_some()); - let registry = state - .get_or_init_remote_dyn_filter_registry(&query_ctx) + let lease = state + .acquire_remote_dyn_filter_registry_lease(&query_ctx) .unwrap(); assert_eq!( - registry.query_id(), + lease.registry().query_id(), query_ctx.remote_query_id_value().unwrap() ); assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1); @@ -658,20 +665,20 @@ mod tests { let second_query_ctx = QueryContext::arc(); let first = state - .get_or_init_remote_dyn_filter_registry(&first_query_ctx) + .acquire_remote_dyn_filter_registry_lease(&first_query_ctx) .unwrap(); let second = state - .get_or_init_remote_dyn_filter_registry(&second_query_ctx) + .acquire_remote_dyn_filter_registry_lease(&second_query_ctx) .unwrap(); - assert!(!Arc::ptr_eq(&first, &second)); + assert!(!first.ptr_eq(&second)); assert_eq!(state.dyn_filter_registry_manager().registry_count(), 2); assert_eq!( - first.query_id(), + first.registry().query_id(), first_query_ctx.remote_query_id_value().unwrap() ); assert_eq!( - second.query_id(), + second.registry().query_id(), second_query_ctx.remote_query_id_value().unwrap() ); }