refactor: per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-03 21:12:28 +08:00
parent 583972b32e
commit bbf289d89f
15 changed files with 733 additions and 408 deletions

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod base64_serde;
mod initial_remote_dyn_filter_reg;
use std::sync::Arc;
@@ -56,31 +57,7 @@ pub const DYN_FILTER_PROTOCOL_VERSION: u32 = 1;
pub enum DynFilterPayload {
/// A serialized DataFusion [`PhysicalExpr`] encoded as a protobuf
/// [`PhysicalExprNode`].
Datafusion(#[serde(with = "base64_bytes")] Vec<u8>),
}
mod base64_bytes {
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(bytes: &[u8], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64_STANDARD.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let encoded = String::deserialize(deserializer)?;
BASE64_STANDARD.decode(encoded).map_err(|err| {
D::Error::custom(format!("invalid base64 dynamic filter payload: {err}"))
})
}
Datafusion(#[serde(with = "base64_serde::bytes")] Vec<u8>),
}
impl DynFilterPayload {

View File

@@ -0,0 +1,78 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Serde adapters for byte fields encoded as base64 strings in JSON.
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
fn encode(bytes: &[u8]) -> String {
BASE64_STANDARD.encode(bytes)
}
fn decode(encoded: &str) -> Result<Vec<u8>, base64::DecodeError> {
BASE64_STANDARD.decode(encoded)
}
pub(crate) mod bytes {
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(value: &[u8], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&super::encode(value))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let encoded = String::deserialize(deserializer)?;
super::decode(&encoded).map_err(|err| {
D::Error::custom(format!("invalid base64 dynamic filter payload: {err}"))
})
}
}
pub(crate) mod bytes_vec {
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(values: &[Vec<u8>], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
values
.iter()
.map(|bytes| super::encode(bytes))
.collect::<Vec<_>>()
.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<Vec<u8>>, D::Error>
where
D: Deserializer<'de>,
{
Vec::<String>::deserialize(deserializer)?
.into_iter()
.map(|encoded| {
super::decode(&encoded).map_err(|error| {
D::Error::custom(format!("invalid base64 bytes vector item: {error}"))
})
})
.collect()
}
}

View File

@@ -110,6 +110,7 @@ impl InitialDynFilterRegs {
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct InitialDynFilterReg {
pub filter_id: String,
#[serde(with = "super::base64_serde::bytes_vec")]
pub child_exprs_datafusion_proto: Vec<Vec<u8>>,
/// Optional producer-side predicate snapshot captured at initial registration time.
///
@@ -227,8 +228,17 @@ mod tests {
]);
let encoded = regs.to_extension_value().unwrap();
let json: serde_json::Value = serde_json::from_str(&encoded).unwrap();
let decoded = InitialDynFilterRegs::from_extension_value(&encoded).unwrap();
assert_eq!(
json["registrations"][0]["child_exprs_datafusion_proto"],
serde_json::json!(["AQID"])
);
assert_eq!(
json["registrations"][1]["child_exprs_datafusion_proto"],
serde_json::json!(["BAU="])
);
assert_eq!(decoded, regs);
}
@@ -241,8 +251,17 @@ mod tests {
]);
let encoded = regs.to_extension_value().unwrap();
let json: serde_json::Value = serde_json::from_str(&encoded).unwrap();
let decoded = InitialDynFilterRegs::from_extension_value(&encoded).unwrap();
assert_eq!(
json["registrations"][0]["child_exprs_datafusion_proto"],
serde_json::json!(["AQID"])
);
assert_eq!(
json["registrations"][0]["initial_snapshot"]["payload"],
serde_json::json!({"kind":"datafusion","payload":"BAUG"})
);
assert_eq!(decoded, regs);
assert_eq!(
decoded.regs[0]
@@ -264,7 +283,7 @@ mod tests {
#[test]
fn initial_dyn_filter_reg_json_defaults_missing_snapshot_to_none() {
let decoded = InitialDynFilterRegs::from_extension_value(
r#"{"registrations":[{"filter_id":"filter-a","child_exprs_datafusion_proto":[[1,2,3]]}]}"#,
r#"{"registrations":[{"filter_id":"filter-a","child_exprs_datafusion_proto":["AQID"]}]}"#,
)
.unwrap();

View File

@@ -24,7 +24,7 @@ mod region_pruner;
mod remote_dyn_filter_registry;
pub use analyzer::{DistPlannerAnalyzer, DistPlannerOptions};
pub use filter_id::{FilterFingerprint, FilterId, ParseFilterIdError, ProducerScopeId};
pub use filter_id::{FilterFingerprint, FilterId, ParseFilterIdError, RemoteDynFilterProducerId};
pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan};
pub use planner::{DistExtensionPlanner, MergeSortExtensionPlanner};
pub use predicate_extractor::PredicateExtractor;

View File

@@ -14,7 +14,6 @@
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use common_telemetry::debug;
use datafusion::config::{ConfigExtension, ExtensionOptions};
@@ -32,7 +31,7 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
use crate::dist_plan::ProducerScopeId;
use crate::dist_plan::RemoteDynFilterProducerId;
use crate::dist_plan::analyzer::utils::{
PatchOptimizerContext, PlanTreeExpressionSimplifier, aliased_columns_for,
rewrite_merge_sort_exprs,
@@ -124,15 +123,14 @@ impl AnalyzerRule for DistPlannerAnalyzer {
let opt = config.extensions.get::<DistPlannerOptions>();
let allow_fallback = opt.map(|o| o.allow_query_fallback).unwrap_or(false);
let producer_scope_allocator = ProducerScopeAllocator::default();
let result = match self.try_push_down(plan.clone(), producer_scope_allocator.clone()) {
let result = match self.try_push_down(plan.clone()) {
Ok(plan) => plan,
Err(err) => {
if allow_fallback {
common_telemetry::warn!(err; "Failed to push down plan, using fallback plan rewriter for plan: {plan}");
// if push down failed, use fallback plan rewriter
PUSH_DOWN_FALLBACK_ERRORS_TOTAL.inc();
self.use_fallback(plan, producer_scope_allocator)?
self.use_fallback(plan)?
} else {
return Err(err);
}
@@ -145,37 +143,21 @@ impl AnalyzerRule for DistPlannerAnalyzer {
impl DistPlannerAnalyzer {
/// Try push down as many nodes as possible
fn try_push_down(
&self,
plan: LogicalPlan,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<LogicalPlan> {
let plan = plan.transform(|plan| {
Self::inspect_plan_with_subquery(plan, producer_scope_allocator.clone())
})?;
let mut rewriter = PlanRewriter::new(producer_scope_allocator);
fn try_push_down(&self, plan: LogicalPlan) -> DfResult<LogicalPlan> {
let plan = plan.transform(&Self::inspect_plan_with_subquery)?;
let mut rewriter = PlanRewriter::default();
let result = plan.data.rewrite(&mut rewriter)?.data;
Ok(result)
Self::assign_merge_scan_remote_dyn_filter_producer_ids(result)
}
/// Use fallback plan rewriter to rewrite the plan and only push down table scan nodes
fn use_fallback(
&self,
plan: LogicalPlan,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<LogicalPlan> {
let plan = plan.transform(|plan| {
Self::inspect_plan_with_subquery(plan, producer_scope_allocator.clone())
})?;
let mut rewriter = fallback::FallbackPlanRewriter::new(producer_scope_allocator);
let result = plan.data.rewrite(&mut rewriter)?.data;
Ok(result)
fn use_fallback(&self, plan: LogicalPlan) -> DfResult<LogicalPlan> {
let mut rewriter = fallback::FallbackPlanRewriter;
let result = plan.rewrite(&mut rewriter)?.data;
Self::assign_merge_scan_remote_dyn_filter_producer_ids(result)
}
fn inspect_plan_with_subquery(
plan: LogicalPlan,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<Transformed<LogicalPlan>> {
fn inspect_plan_with_subquery(plan: LogicalPlan) -> DfResult<Transformed<LogicalPlan>> {
// Workaround for https://github.com/GreptimeTeam/greptimedb/issues/5469 and https://github.com/GreptimeTeam/greptimedb/issues/5799
// FIXME(yingwen): Remove the `Limit` plan once we update DataFusion.
if let LogicalPlan::Limit(_) | LogicalPlan::Distinct(_) = &plan {
@@ -185,10 +167,7 @@ impl DistPlannerAnalyzer {
let exprs = plan
.expressions_consider_join()
.into_iter()
.map(|e| {
e.transform(|expr| Self::transform_subquery(expr, producer_scope_allocator.clone()))
.map(|x| x.data)
})
.map(|e| e.transform(&Self::transform_subquery).map(|x| x.data))
.collect::<DfResult<Vec<_>>>()?;
// Some plans that are special treated (should not call `with_new_exprs` on them)
@@ -200,37 +179,33 @@ impl DistPlannerAnalyzer {
}
}
fn transform_subquery(
expr: Expr,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<Transformed<Expr>> {
fn transform_subquery(expr: Expr) -> DfResult<Transformed<Expr>> {
match expr {
Expr::Exists(exists) => Ok(Transformed::yes(Expr::Exists(Exists {
subquery: Self::handle_subquery(exists.subquery, producer_scope_allocator)?,
subquery: Self::handle_subquery(exists.subquery)?,
negated: exists.negated,
}))),
Expr::InSubquery(in_subquery) => Ok(Transformed::yes(Expr::InSubquery(InSubquery {
expr: in_subquery.expr,
subquery: Self::handle_subquery(in_subquery.subquery, producer_scope_allocator)?,
subquery: Self::handle_subquery(in_subquery.subquery)?,
negated: in_subquery.negated,
}))),
Expr::ScalarSubquery(scalar_subquery) => Ok(Transformed::yes(Expr::ScalarSubquery(
Self::handle_subquery(scalar_subquery, producer_scope_allocator)?,
Self::handle_subquery(scalar_subquery)?,
))),
_ => Ok(Transformed::no(expr)),
}
}
fn handle_subquery(
subquery: Subquery,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<Subquery> {
let subquery_plan = subquery.subquery.as_ref().clone().transform(|plan| {
Self::inspect_plan_with_subquery(plan, producer_scope_allocator.clone())
})?;
let mut rewriter = PlanRewriter::new(producer_scope_allocator);
let mut rewrote_subquery = subquery_plan.data.rewrite(&mut rewriter)?.data;
fn handle_subquery(subquery: Subquery) -> DfResult<Subquery> {
let mut rewriter = PlanRewriter::default();
let mut rewrote_subquery = subquery
.subquery
.as_ref()
.clone()
.rewrite(&mut rewriter)?
.data;
// Workaround. DF doesn't support the first plan in subquery to be an Extension
if matches!(rewrote_subquery, LogicalPlan::Extension(_)) {
let output_schema = rewrote_subquery.schema().clone();
@@ -250,20 +225,56 @@ impl DistPlannerAnalyzer {
spans: Default::default(),
})
}
fn assign_merge_scan_remote_dyn_filter_producer_ids(
plan: LogicalPlan,
) -> DfResult<LogicalPlan> {
let mut assigner = MergeScanRemoteDynFilterProducerIdAssigner::default();
Ok(plan.rewrite_with_subqueries(&mut assigner)?.data)
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct ProducerScopeAllocator {
next_remote_dyn_filter_producer_scope_id: Arc<AtomicU64>,
#[derive(Debug, Default)]
struct RemoteDynFilterProducerIdAllocator {
next_remote_dyn_filter_producer_id: u64,
}
impl ProducerScopeAllocator {
fn allocate(&self) -> ProducerScopeId {
ProducerScopeId::new(
self.next_remote_dyn_filter_producer_scope_id
.fetch_add(1, Ordering::SeqCst)
+ 1,
)
impl RemoteDynFilterProducerIdAllocator {
fn allocate(&mut self) -> RemoteDynFilterProducerId {
self.next_remote_dyn_filter_producer_id += 1;
RemoteDynFilterProducerId::new(self.next_remote_dyn_filter_producer_id)
}
}
/// Assigns query-local RDF producer ids to visible `MergeScan` nodes after plan rewriting.
#[derive(Debug, Default)]
struct MergeScanRemoteDynFilterProducerIdAssigner {
remote_dyn_filter_producer_id_allocator: RemoteDynFilterProducerIdAllocator,
}
impl TreeNodeRewriter for MergeScanRemoteDynFilterProducerIdAssigner {
type Node = LogicalPlan;
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
let LogicalPlan::Extension(extension) = &node else {
return Ok(Transformed::no(node));
};
let Some(merge_scan) = extension
.node
.as_any()
.downcast_ref::<MergeScanLogicalPlan>()
else {
return Ok(Transformed::no(node));
};
Ok(Transformed::yes(
merge_scan
.clone()
.with_remote_dyn_filter_producer_id(
self.remote_dyn_filter_producer_id_allocator.allocate(),
)
.into_logical_plan(),
))
}
}
@@ -330,21 +341,9 @@ struct PlanRewriter {
/// so that we can push down the `Sort` plan as much as possible.
expand_on_next_part_cond_trans_commutative: bool,
new_child_plan: Option<LogicalPlan>,
producer_scope_allocator: ProducerScopeAllocator,
}
impl PlanRewriter {
fn new(producer_scope_allocator: ProducerScopeAllocator) -> Self {
Self {
producer_scope_allocator,
..Default::default()
}
}
fn allocate_remote_dyn_filter_producer_scope_id(&self) -> ProducerScopeId {
self.producer_scope_allocator.allocate()
}
fn get_parent(&self) -> Option<&LogicalPlan> {
// level starts from 1, it's safe to minus by 1
self.stack
@@ -639,14 +638,12 @@ impl PlanRewriter {
);
// add merge scan as the new root
let producer_scope_id = self.allocate_remote_dyn_filter_producer_scope_id();
let mut node = MergeScanLogicalPlan::new(
on_node.clone(),
false,
// at this stage, the partition cols should be set
// treat it as non-partitioned if None
self.partition_cols.clone().unwrap_or_default(),
producer_scope_id,
)
.into_logical_plan();

View File

@@ -27,31 +27,15 @@ use datafusion_expr::LogicalPlan;
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
use crate::dist_plan::analyzer::{
AliasMapping, OTHER_PHY_PART_COL_PLACEHOLDER, ProducerScopeAllocator,
};
use crate::dist_plan::{MergeScanLogicalPlan, ProducerScopeId};
use crate::dist_plan::MergeScanLogicalPlan;
use crate::dist_plan::analyzer::{AliasMapping, OTHER_PHY_PART_COL_PLACEHOLDER};
/// FallbackPlanRewriter is a plan rewriter that will only push down table scan node
/// This is used when `PlanRewriter` produce errors when trying to rewrite the plan
/// This is a temporary solution, and will be removed once we have a more robust plan rewriter
/// It will traverse the logical plan and rewrite table scan node to merge scan node
#[derive(Debug, Clone, Default)]
pub struct FallbackPlanRewriter {
producer_scope_allocator: ProducerScopeAllocator,
}
impl FallbackPlanRewriter {
pub fn new(producer_scope_allocator: ProducerScopeAllocator) -> Self {
Self {
producer_scope_allocator,
}
}
fn allocate_remote_dyn_filter_producer_scope_id(&self) -> ProducerScopeId {
self.producer_scope_allocator.allocate()
}
}
pub struct FallbackPlanRewriter;
impl TreeNodeRewriter for FallbackPlanRewriter {
type Node = LogicalPlan;
@@ -121,7 +105,6 @@ impl TreeNodeRewriter for FallbackPlanRewriter {
// at this stage, the partition cols should be set
// treat it as non-partitioned if None
partition_cols.clone().unwrap_or_default(),
self.allocate_remote_dyn_filter_producer_scope_id(),
)
.into_logical_plan();
Ok(Transformed::yes(node))

View File

@@ -30,7 +30,7 @@ use datafusion::functions_aggregate::expr_fn::avg;
use datafusion::functions_aggregate::min_max::{max, min};
use datafusion::prelude::SessionContext;
use datafusion_common::{JoinType, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr::{Exists, ScalarFunction};
use datafusion_expr::{
AggregateUDF, Expr, ExprSchemable as _, LogicalPlanBuilder, Operator, Subquery, binary_expr,
col, lit,
@@ -55,40 +55,48 @@ use table::{Table, TableRef};
use super::*;
fn collect_merge_scan_producer_scope_ids(
fn collect_merge_scan_remote_dyn_filter_producer_ids(
plan: &LogicalPlan,
scopes: &mut BTreeSet<ProducerScopeId>,
producer_ids: &mut BTreeSet<RemoteDynFilterProducerId>,
) {
if let LogicalPlan::Extension(extension) = plan
&& let Some(merge_scan) = extension
.node
.as_any()
.downcast_ref::<MergeScanLogicalPlan>()
{
scopes.insert(merge_scan.producer_scope_id());
}
let mut producer_id_list = Vec::new();
collect_merge_scan_remote_dyn_filter_producer_id_list(plan, &mut producer_id_list);
producer_ids.extend(producer_id_list);
}
for input in plan.inputs() {
collect_merge_scan_producer_scope_ids(input, scopes);
struct MergeScanRemoteDynFilterProducerIdCollector<'a> {
producer_ids: &'a mut Vec<RemoteDynFilterProducerId>,
}
impl TreeNodeRewriter for MergeScanRemoteDynFilterProducerIdCollector<'_> {
type Node = LogicalPlan;
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if let LogicalPlan::Extension(extension) = &node
&& let Some(merge_scan) = extension
.node
.as_any()
.downcast_ref::<MergeScanLogicalPlan>()
{
self.producer_ids.push(
merge_scan
.remote_dyn_filter_producer_id()
.expect("MergeScan remote dynamic filter producer id must be assigned"),
);
}
Ok(Transformed::no(node))
}
}
fn collect_merge_scan_producer_scope_id_list(
fn collect_merge_scan_remote_dyn_filter_producer_id_list(
plan: &LogicalPlan,
scopes: &mut Vec<ProducerScopeId>,
producer_ids: &mut Vec<RemoteDynFilterProducerId>,
) {
if let LogicalPlan::Extension(extension) = plan
&& let Some(merge_scan) = extension
.node
.as_any()
.downcast_ref::<MergeScanLogicalPlan>()
{
scopes.push(merge_scan.producer_scope_id());
}
for input in plan.inputs() {
collect_merge_scan_producer_scope_id_list(input, scopes);
}
let _ = plan
.clone()
.rewrite_with_subqueries(&mut MergeScanRemoteDynFilterProducerIdCollector { producer_ids })
.unwrap();
}
pub(crate) struct TestTable;
@@ -1399,7 +1407,7 @@ fn test_simplify_select_now_expression() {
}
#[test]
fn sibling_merge_scan_producers_have_unique_scope_ids() {
fn sibling_merge_scans_have_unique_remote_dyn_filter_producer_ids() {
init_default_ut_logging();
let left_table = TestTable::table_with_name(0, "left_table".to_string());
let right_table = TestTable::table_with_name(1, "right_table".to_string());
@@ -1436,19 +1444,19 @@ fn sibling_merge_scan_producers_have_unique_scope_ids() {
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let mut scopes = Vec::new();
collect_merge_scan_producer_scope_id_list(&result, &mut scopes);
let unique_scopes = scopes.iter().copied().collect::<BTreeSet<_>>();
let mut producer_ids = Vec::new();
collect_merge_scan_remote_dyn_filter_producer_id_list(&result, &mut producer_ids);
let unique_producer_ids = producer_ids.iter().copied().collect::<BTreeSet<_>>();
assert!(
scopes.len() >= 2,
"Expected at least 2 ProducerScopeIds, got {}: {scopes:?}",
scopes.len()
producer_ids.len() >= 2,
"Expected at least 2 RemoteDynFilterProducerIds, got {}: {producer_ids:?}",
producer_ids.len()
);
assert_eq!(
scopes.len(),
unique_scopes.len(),
"Expected all sibling ProducerScopeIds to be unique, got scopes: {scopes:?}"
producer_ids.len(),
unique_producer_ids.len(),
"Expected all sibling RemoteDynFilterProducerIds to be unique, got ids: {producer_ids:?}"
);
}
@@ -1916,7 +1924,7 @@ fn transform_sort_subquery_alias() {
}
#[test]
fn producer_scope_ids_do_not_collide_between_subquery_and_outer_plan() {
fn remote_dyn_filter_producer_ids_do_not_collide_between_subquery_and_outer_plan() {
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
@@ -1931,28 +1939,21 @@ fn producer_scope_ids_do_not_collide_between_subquery_and_outer_plan() {
outer_ref_columns: Default::default(),
spans: Default::default(),
};
let allocator = ProducerScopeAllocator::default();
let rewritten_subquery =
DistPlannerAnalyzer::handle_subquery(subquery, allocator.clone()).unwrap();
let outer_plan = LogicalPlanBuilder::scan_with_filters("outer", table_source, None, vec![])
.unwrap()
.filter(Expr::Exists(Exists {
subquery,
negated: false,
}))
.unwrap()
.build()
.unwrap();
let rewritten_outer = DistPlannerAnalyzer {}
.try_push_down(outer_plan, allocator)
.unwrap();
let rewritten = DistPlannerAnalyzer {}.try_push_down(outer_plan).unwrap();
let mut subquery_scopes = BTreeSet::new();
collect_merge_scan_producer_scope_ids(
rewritten_subquery.subquery.as_ref(),
&mut subquery_scopes,
);
let mut outer_scopes = BTreeSet::new();
collect_merge_scan_producer_scope_ids(&rewritten_outer, &mut outer_scopes);
let mut producer_ids = BTreeSet::new();
collect_merge_scan_remote_dyn_filter_producer_ids(&rewritten, &mut producer_ids);
assert_eq!(subquery_scopes.len(), 1);
assert_eq!(outer_scopes.len(), 1);
assert_ne!(subquery_scopes, outer_scopes);
assert_eq!(producer_ids.len(), 2);
}
#[test]

View File

@@ -20,7 +20,6 @@ use common_query::request::{
INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, InitialDynFilterReg,
InitialDynFilterRegs, InitialDynFilterSnapshot,
};
use datafusion::execution::TaskContext;
use datafusion_common::Result;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
@@ -28,8 +27,7 @@ use session::context::{QueryContext, QueryContextRef};
use store_api::storage::RegionId;
use crate::dist_plan::filter_id::build_remote_dyn_filter_id;
use crate::dist_plan::{FilterId, ProducerScopeId, QueryDynFilterRegistry, Subscriber};
use crate::query_engine::QueryEngineState;
use crate::dist_plan::{FilterId, QueryDynFilterRegistry, RemoteDynFilterProducerId, Subscriber};
#[derive(Debug, Clone)]
pub(crate) struct CapturedDynFilter {
@@ -48,7 +46,7 @@ pub(crate) struct RemoteDynFilterPushdown {
}
pub(crate) fn capture_remote_dyn_filters_for_pushdown(
producer_scope_id: ProducerScopeId,
remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
parent_filters: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
) -> RemoteDynFilterPushdown {
let mut pushed_down = Vec::with_capacity(parent_filters.len());
@@ -60,8 +58,11 @@ pub(crate) fn capture_remote_dyn_filters_for_pushdown(
continue;
};
match build_captured_dyn_filter(producer_scope_id, producer_local_ordinal, alive_dyn_filter)
{
match build_captured_dyn_filter(
remote_dyn_filter_producer_id,
producer_local_ordinal,
alive_dyn_filter,
) {
Ok(captured_dyn_filter) => {
pushed_down.push(true);
captured_dyn_filters.push(captured_dyn_filter);
@@ -106,10 +107,6 @@ fn downcast_dynamic_filter(
.ok()
}
fn query_engine_state_from_task_context(context: &TaskContext) -> Option<Arc<QueryEngineState>> {
context.session_config().get_extension()
}
pub(crate) fn register_dyn_filters_for_region(
registry: &QueryDynFilterRegistry,
region_id: RegionId,
@@ -125,29 +122,8 @@ pub(crate) fn register_dyn_filters_for_region(
}
}
pub(crate) fn bridge_dyn_filters_for_region(
context: &TaskContext,
query_ctx: &QueryContextRef,
region_id: RegionId,
captured_dyn_filters: &[CapturedDynFilter],
) {
if captured_dyn_filters.is_empty() {
return;
}
let Some(query_engine_state) = query_engine_state_from_task_context(context) else {
return;
};
let Some(registry) = query_engine_state.get_or_init_remote_dyn_filter_registry(query_ctx)
else {
return;
};
register_dyn_filters_for_region(&registry, region_id, captured_dyn_filters);
}
fn build_captured_dyn_filter(
producer_scope_id: ProducerScopeId,
remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
producer_local_ordinal: usize,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
) -> Result<CapturedDynFilter> {
@@ -156,8 +132,11 @@ fn build_captured_dyn_filter(
.into_iter()
.cloned()
.collect::<Vec<_>>();
let filter_id =
build_remote_dyn_filter_id(producer_scope_id, producer_local_ordinal, &children)?;
let filter_id = build_remote_dyn_filter_id(
remote_dyn_filter_producer_id,
producer_local_ordinal,
&children,
)?;
let initial_registration =
InitialDynFilterReg::from_filter_id_and_children(filter_id.to_string(), &children)?;
@@ -295,6 +274,7 @@ mod tests {
use std::fmt;
use std::hash::{Hash, Hasher};
use datafusion::execution::TaskContext;
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::expressions::{Column, lit};
@@ -372,18 +352,18 @@ mod tests {
QueryId::from(Uuid::from_u128(value))
}
fn test_producer_scope(value: u64) -> ProducerScopeId {
ProducerScopeId::new(value)
fn test_remote_dyn_filter_producer_id(value: u64) -> RemoteDynFilterProducerId {
RemoteDynFilterProducerId::new(value)
}
fn test_captured_dyn_filter(
producer_scope_id: ProducerScopeId,
remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
producer_local_ordinal: usize,
column_name: &str,
column_index: usize,
) -> CapturedDynFilter {
build_captured_dyn_filter(
producer_scope_id,
remote_dyn_filter_producer_id,
producer_local_ordinal,
Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>],
@@ -423,13 +403,20 @@ mod tests {
)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
];
let producer_scope_id = test_producer_scope(42);
let captured = capture_remote_dyn_filters_for_pushdown(producer_scope_id, parent_filters)
.captured_dyn_filters;
let remote_dyn_filter_producer_id = test_remote_dyn_filter_producer_id(42);
let captured =
capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters)
.captured_dyn_filters;
assert_eq!(captured.len(), 2);
assert_eq!(captured[0].filter_id.producer_scope_id(), producer_scope_id);
assert_eq!(captured[1].filter_id.producer_scope_id(), producer_scope_id);
assert_eq!(
captured[0].filter_id.remote_dyn_filter_producer_id(),
remote_dyn_filter_producer_id
);
assert_eq!(
captured[1].filter_id.remote_dyn_filter_producer_id(),
remote_dyn_filter_producer_id
);
assert_eq!(captured[0].filter_id.producer_ordinal(), 1);
assert_eq!(captured[1].filter_id.producer_ordinal(), 3);
}
@@ -445,16 +432,17 @@ mod tests {
Arc::new(Column::new("zone", 2)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
];
let producer_scope_id = test_producer_scope(42);
let pushdown = capture_remote_dyn_filters_for_pushdown(producer_scope_id, parent_filters);
let remote_dyn_filter_producer_id = test_remote_dyn_filter_producer_id(42);
let pushdown =
capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters);
assert_eq!(pushdown.pushed_down, vec![false, true, false]);
assert_eq!(pushdown.captured_dyn_filters.len(), 1);
assert_eq!(
pushdown.captured_dyn_filters[0]
.filter_id
.producer_scope_id(),
producer_scope_id
.remote_dyn_filter_producer_id(),
remote_dyn_filter_producer_id
);
assert_eq!(
pushdown.captured_dyn_filters[0]
@@ -478,8 +466,10 @@ mod tests {
))
as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
let pushdown = capture_remote_dyn_filters_for_pushdown(
test_remote_dyn_filter_producer_id(42),
parent_filters,
);
assert_eq!(pushdown.pushed_down, vec![false]);
assert!(pushdown.captured_dyn_filters.is_empty());
@@ -493,8 +483,10 @@ mod tests {
))
as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
let pushdown = capture_remote_dyn_filters_for_pushdown(
test_remote_dyn_filter_producer_id(42),
parent_filters,
);
assert_eq!(pushdown.pushed_down, vec![true]);
assert!(
@@ -514,8 +506,10 @@ mod tests {
dyn_filter.update(lit(false) as _).unwrap();
let parent_filters = vec![dyn_filter as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
let pushdown = capture_remote_dyn_filters_for_pushdown(
test_remote_dyn_filter_producer_id(42),
parent_filters,
);
assert_eq!(pushdown.pushed_down, vec![true]);
let snapshot = pushdown.captured_dyn_filters[0]
@@ -540,8 +534,10 @@ mod tests {
as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
];
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
let pushdown = capture_remote_dyn_filters_for_pushdown(
test_remote_dyn_filter_producer_id(42),
parent_filters,
);
assert_eq!(pushdown.pushed_down, vec![true, true]);
assert_eq!(pushdown.captured_dyn_filters.len(), 2);
@@ -564,8 +560,10 @@ mod tests {
})
.collect::<Vec<_>>();
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
let pushdown = capture_remote_dyn_filters_for_pushdown(
test_remote_dyn_filter_producer_id(42),
parent_filters,
);
assert!(pushdown.captured_dyn_filters.is_empty());
assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]);
@@ -584,8 +582,10 @@ mod tests {
})
.collect::<Vec<_>>();
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
let pushdown = capture_remote_dyn_filters_for_pushdown(
test_remote_dyn_filter_producer_id(42),
parent_filters,
);
assert!(pushdown.captured_dyn_filters.is_empty());
assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]);
@@ -595,7 +595,7 @@ mod tests {
fn register_dyn_filters_for_region_reuses_existing_entry() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
let captured_dyn_filters = vec![test_captured_dyn_filter(
test_producer_scope(42),
test_remote_dyn_filter_producer_id(42),
2,
"host",
0,
@@ -609,8 +609,8 @@ mod tests {
assert_eq!(registry.entry_count(), 1);
let entry = registry.entries().pop().unwrap();
assert_eq!(
entry.filter_id().producer_scope_id(),
test_producer_scope(42)
entry.filter_id().remote_dyn_filter_producer_id(),
test_remote_dyn_filter_producer_id(42)
);
assert_eq!(entry.filter_id().producer_ordinal(), 2);
let subscribers = entry.subscribers();
@@ -628,21 +628,22 @@ mod tests {
}
#[test]
fn register_dyn_filters_for_region_keeps_independent_producer_scopes_distinct() {
fn register_dyn_filters_for_region_keeps_independent_producer_ids_distinct() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
let region_id = RegionId::new(1024, 7);
let make_filter =
|producer_scope_id| test_captured_dyn_filter(producer_scope_id, 2, "host", 0);
let make_filter = |remote_dyn_filter_producer_id| {
test_captured_dyn_filter(remote_dyn_filter_producer_id, 2, "host", 0)
};
register_dyn_filters_for_region(
&registry,
region_id,
&[make_filter(test_producer_scope(42))],
&[make_filter(test_remote_dyn_filter_producer_id(42))],
);
register_dyn_filters_for_region(
&registry,
region_id,
&[make_filter(test_producer_scope(43))],
&[make_filter(test_remote_dyn_filter_producer_id(43))],
);
assert_eq!(registry.entry_count(), 2);
@@ -651,7 +652,7 @@ mod tests {
#[test]
fn query_context_includes_region_initial_dyn_filter_regs() {
let captured_dyn_filters = vec![test_captured_dyn_filter(
test_producer_scope(42),
test_remote_dyn_filter_producer_id(42),
2,
"host",
0,
@@ -691,8 +692,8 @@ mod tests {
#[test]
fn query_context_drops_initial_regs_when_duplicate_filter_ids_exceed_bounds() {
let captured_dyn_filters = vec![
test_captured_dyn_filter(test_producer_scope(42), 2, "host", 0),
test_captured_dyn_filter(test_producer_scope(42), 2, "host", 0),
test_captured_dyn_filter(test_remote_dyn_filter_producer_id(42), 2, "host", 0),
test_captured_dyn_filter(test_remote_dyn_filter_producer_id(42), 2, "host", 0),
];
let region_id = RegionId::new(1024, 7);
let query_ctx = QueryContext::arc();

View File

@@ -51,10 +51,15 @@ impl FromStr for FilterFingerprint {
}
}
/// Query-local identity for one remote dynamic filter producer.
///
/// The analyzer assigns a new id to each frontend-side `MergeScan` rewrite so filters from
/// independent producers cannot collide even when their producer-local ordinals and child
/// fingerprints match.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ProducerScopeId(u64);
pub struct RemoteDynFilterProducerId(u64);
impl ProducerScopeId {
impl RemoteDynFilterProducerId {
pub fn new(value: u64) -> Self {
Self(value)
}
@@ -64,13 +69,13 @@ impl ProducerScopeId {
}
}
impl Display for ProducerScopeId {
impl Display for RemoteDynFilterProducerId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:016x}", self.0)
}
}
impl FromStr for ProducerScopeId {
impl FromStr for RemoteDynFilterProducerId {
type Err = ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
@@ -80,7 +85,7 @@ impl FromStr for ProducerScopeId {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct FilterId {
producer_scope_id: ProducerScopeId,
remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
producer_ordinal: u32,
children_fingerprint: FilterFingerprint,
}
@@ -91,12 +96,12 @@ pub struct FilterId {
impl FilterId {
pub fn new(
producer_scope_id: ProducerScopeId,
remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
producer_ordinal: u32,
children_fingerprint: FilterFingerprint,
) -> Self {
Self {
producer_scope_id,
remote_dyn_filter_producer_id,
producer_ordinal,
children_fingerprint,
}
@@ -106,8 +111,8 @@ impl FilterId {
self.producer_ordinal
}
pub fn producer_scope_id(&self) -> ProducerScopeId {
self.producer_scope_id
pub fn remote_dyn_filter_producer_id(&self) -> RemoteDynFilterProducerId {
self.remote_dyn_filter_producer_id
}
pub fn children_fingerprint(&self) -> FilterFingerprint {
@@ -120,7 +125,7 @@ impl Display for FilterId {
write!(
f,
"{}:{}:{}",
self.producer_scope_id, self.producer_ordinal, self.children_fingerprint
self.remote_dyn_filter_producer_id, self.producer_ordinal, self.children_fingerprint
)
}
}
@@ -130,10 +135,10 @@ impl FromStr for FilterId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.split(':');
let producer_scope_id = parts
let remote_dyn_filter_producer_id = parts
.next()
.ok_or(ParseFilterIdError)?
.parse::<ProducerScopeId>()
.parse::<RemoteDynFilterProducerId>()
.map_err(|_| ParseFilterIdError)?;
let producer_local_ordinal = parts
.next()
@@ -150,7 +155,7 @@ impl FromStr for FilterId {
}
Ok(Self::new(
producer_scope_id,
remote_dyn_filter_producer_id,
producer_local_ordinal,
children_fingerprint,
))
@@ -168,15 +173,12 @@ impl Display for ParseFilterIdError {
/// Builds the query-local remote dynamic filter identity.
///
/// The identity is `producer scope + producer-local ordinal + canonicalized child fingerprint`.
/// The identity is `remote dynamic filter producer id + producer-local ordinal + canonicalized child fingerprint`.
/// Subscriber routing details such as `region_id` and `partition` stay outside this key so they
/// can remain in the later fanout/subscriber map instead of splitting one shared remote filter state.
///
/// NOTE(remote-dyn-filter): This id is generated once on the source side and then propagated.
/// Consumers should reuse the propagated `FilterId` instead of independently recomputing it from
/// local state.
/// See [`FilterId`] for the propagation contract.
pub(crate) fn build_remote_dyn_filter_id(
producer_scope_id: ProducerScopeId,
remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
producer_local_ordinal: usize,
children: &[Arc<dyn PhysicalExpr>],
) -> Result<FilterId> {
@@ -186,7 +188,7 @@ pub(crate) fn build_remote_dyn_filter_id(
DataFusionError::Execution("producer ordinal out of range for filter id".to_string())
})?;
Ok(FilterId::new(
producer_scope_id,
remote_dyn_filter_producer_id,
producer_local_ordinal,
children_fingerprint,
))
@@ -228,7 +230,11 @@ mod tests {
#[test]
fn filter_id_round_trips_through_string() {
let filter_id = FilterId::new(ProducerScopeId::new(42), 3, FilterFingerprint::new(0xabc));
let filter_id = FilterId::new(
RemoteDynFilterProducerId::new(42),
3,
FilterFingerprint::new(0xabc),
);
let encoded = filter_id.to_string();
assert_eq!(encoded, "000000000000002a:3:0000000000000abc");
@@ -249,37 +255,48 @@ mod tests {
#[test]
fn remote_dyn_filter_id_is_stable_for_equivalent_children() {
let producer_scope_id = ProducerScopeId::new(42);
let first =
build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["host", "pod"]))
.unwrap();
let second =
build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["host", "pod"]))
.unwrap();
let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
let first = build_remote_dyn_filter_id(
remote_dyn_filter_producer_id,
3,
&test_children(&["host", "pod"]),
)
.unwrap();
let second = build_remote_dyn_filter_id(
remote_dyn_filter_producer_id,
3,
&test_children(&["host", "pod"]),
)
.unwrap();
assert_eq!(first, second);
}
#[test]
fn remote_dyn_filter_id_changes_when_producer_scope_changes() {
fn remote_dyn_filter_id_changes_when_remote_dyn_filter_producer_changes() {
let children = test_children(&["host", "pod"]);
let baseline = build_remote_dyn_filter_id(ProducerScopeId::new(42), 3, &children).unwrap();
let different_scope =
build_remote_dyn_filter_id(ProducerScopeId::new(43), 3, &children).unwrap();
let baseline =
build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(42), 3, &children).unwrap();
let different_producer_id =
build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(43), 3, &children).unwrap();
assert_ne!(baseline, different_scope);
assert_ne!(baseline, different_producer_id);
}
#[test]
fn remote_dyn_filter_id_changes_when_identity_inputs_change() {
let children = test_children(&["host", "pod"]);
let producer_scope_id = ProducerScopeId::new(42);
let baseline = build_remote_dyn_filter_id(producer_scope_id, 3, &children).unwrap();
let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
let baseline =
build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 3, &children).unwrap();
let different_ordinal =
build_remote_dyn_filter_id(producer_scope_id, 4, &children).unwrap();
let different_children =
build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["pod", "host"]))
.unwrap();
build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 4, &children).unwrap();
let different_children = build_remote_dyn_filter_id(
remote_dyn_filter_producer_id,
3,
&test_children(&["pod", "host"]),
)
.unwrap();
assert_ne!(baseline, different_ordinal);
assert_ne!(baseline, different_children);
@@ -287,19 +304,22 @@ mod tests {
#[test]
fn remote_dyn_filter_id_supports_empty_children() {
let producer_scope_id = ProducerScopeId::new(42);
let first = build_remote_dyn_filter_id(producer_scope_id, 1, &[]).unwrap();
let second = build_remote_dyn_filter_id(producer_scope_id, 1, &[]).unwrap();
let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
let first = build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 1, &[]).unwrap();
let second = build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 1, &[]).unwrap();
assert_eq!(first, second);
assert_eq!(first.producer_scope_id(), producer_scope_id);
assert_eq!(
first.remote_dyn_filter_producer_id(),
remote_dyn_filter_producer_id
);
assert_eq!(first.producer_ordinal(), 1);
assert_eq!(first.children_fingerprint(), second.children_fingerprint());
}
#[test]
fn remote_dyn_filter_id_rejects_out_of_range_producer_ordinal() {
let error = build_remote_dyn_filter_id(ProducerScopeId::new(42), usize::MAX, &[])
let error = build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(42), usize::MAX, &[])
.unwrap_err()
.to_string();

View File

@@ -54,10 +54,10 @@ use tracing::{Instrument, Span};
use crate::dist_plan::analyzer::AliasMapping;
use crate::dist_plan::analyzer::utils::patch_batch_timezone;
use crate::dist_plan::dyn_filter_bridge::{
CapturedDynFilter, bridge_dyn_filters_for_region, capture_remote_dyn_filters_for_pushdown,
query_context_with_initial_dyn_filter_regs,
CapturedDynFilter, capture_remote_dyn_filters_for_pushdown,
query_context_with_initial_dyn_filter_regs, register_dyn_filters_for_region,
};
use crate::dist_plan::{ProducerScopeId, RemoteDynFilterRegistryLease};
use crate::dist_plan::{RemoteDynFilterProducerId, RemoteDynFilterRegistryLease};
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
use crate::options::FlowQueryExtensions;
use crate::query_engine::QueryEngineState;
@@ -67,7 +67,7 @@ fn query_engine_state_from_task_context(context: &TaskContext) -> Option<Arc<Que
context.session_config().get_extension()
}
fn acquire_remote_dyn_filter_registry_cleanup(
fn acquire_remote_dyn_filter_registry_lease(
context: &TaskContext,
query_ctx: &QueryContextRef,
captured_dyn_filters: &[CapturedDynFilter],
@@ -92,7 +92,8 @@ pub struct MergeScanLogicalPlan {
/// If this plan is a placeholder
is_placeholder: bool,
partition_cols: AliasMapping,
producer_scope_id: ProducerScopeId,
/// Assigned after dist-plan rewriting so rewriters only deal with plan shape.
remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
}
impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
@@ -133,20 +134,23 @@ impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
}
impl MergeScanLogicalPlan {
pub fn new(
input: LogicalPlan,
is_placeholder: bool,
partition_cols: AliasMapping,
producer_scope_id: ProducerScopeId,
) -> Self {
pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
Self {
input,
is_placeholder,
partition_cols,
producer_scope_id,
remote_dyn_filter_producer_id: None,
}
}
pub(crate) fn with_remote_dyn_filter_producer_id(
mut self,
remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
) -> Self {
self.remote_dyn_filter_producer_id = Some(remote_dyn_filter_producer_id);
self
}
pub fn name() -> &'static str {
"MergeScan"
}
@@ -170,8 +174,8 @@ impl MergeScanLogicalPlan {
&self.partition_cols
}
pub fn producer_scope_id(&self) -> ProducerScopeId {
self.producer_scope_id
pub fn remote_dyn_filter_producer_id(&self) -> Option<RemoteDynFilterProducerId> {
self.remote_dyn_filter_producer_id
}
}
@@ -189,7 +193,12 @@ pub struct MergeScanExec {
/// Metrics for each partition
partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
query_ctx: QueryContextRef,
remote_dyn_filter_producer_scope_id: ProducerScopeId,
/// Optional because remote dynamic filters must fail open.
///
/// If producer id assignment is missing, `MergeScanExec` still executes the query normally and
/// only skips RDF pushdown/registration while keeping parent-side filters as the correctness
/// fallback.
remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
captured_remote_dyn_filters: Arc<Mutex<Vec<CapturedDynFilter>>>,
target_partition: usize,
partition_cols: AliasMapping,
@@ -217,7 +226,7 @@ impl MergeScanExec {
query_ctx: QueryContextRef,
target_partition: usize,
partition_cols: AliasMapping,
producer_scope_id: ProducerScopeId,
remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
) -> Result<Self> {
// TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
// keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
@@ -290,7 +299,7 @@ impl MergeScanExec {
partition_metrics: Arc::default(),
properties,
query_ctx,
remote_dyn_filter_producer_scope_id: producer_scope_id,
remote_dyn_filter_producer_id,
captured_remote_dyn_filters: Arc::default(),
target_partition,
partition_cols,
@@ -318,14 +327,14 @@ impl MergeScanExec {
let current_channel = self.query_ctx.channel();
let read_preference = self.query_ctx.read_preference();
let explain_verbose = self.query_ctx.explain_verbose();
let remote_dyn_filter_registry_cleanup = acquire_remote_dyn_filter_registry_cleanup(
let remote_dyn_filter_registry_lease = acquire_remote_dyn_filter_registry_lease(
context.as_ref(),
&query_ctx,
&captured_remote_dyn_filters,
);
let stream = Box::pin(stream!({
let _remote_dyn_filter_registry_cleanup = remote_dyn_filter_registry_cleanup;
let remote_dyn_filter_registry_lease = remote_dyn_filter_registry_lease;
// only report metrics once for each MergeScan
if partition == 0 {
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
@@ -341,12 +350,15 @@ impl MergeScanExec {
.step_by(target_partition)
.copied()
{
bridge_dyn_filters_for_region(
context.as_ref(),
&query_ctx,
region_id,
&captured_remote_dyn_filters,
);
if let Some(remote_dyn_filter_registry_lease) =
remote_dyn_filter_registry_lease.as_ref()
{
register_dyn_filters_for_region(
remote_dyn_filter_registry_lease.registry(),
region_id,
&captured_remote_dyn_filters,
);
}
let region_span = tracing_context.attach(tracing::info_span!(
parent: &Span::current(),
@@ -531,7 +543,7 @@ impl MergeScanExec {
sub_stage_metrics: self.sub_stage_metrics.clone(),
partition_metrics: self.partition_metrics.clone(),
query_ctx: self.query_ctx.clone(),
remote_dyn_filter_producer_scope_id: self.remote_dyn_filter_producer_scope_id,
remote_dyn_filter_producer_id: self.remote_dyn_filter_producer_id,
captured_remote_dyn_filters: self.captured_remote_dyn_filters.clone(),
target_partition: self.target_partition,
partition_cols: self.partition_cols.clone(),
@@ -592,8 +604,8 @@ impl MergeScanExec {
#[cfg(test)]
impl MergeScanExec {
fn remote_dyn_filter_producer_scope_id(&self) -> ProducerScopeId {
self.remote_dyn_filter_producer_scope_id
fn remote_dyn_filter_producer_id(&self) -> Option<RemoteDynFilterProducerId> {
self.remote_dyn_filter_producer_id
}
}
@@ -715,10 +727,22 @@ impl ExecutionPlan for MergeScanExec {
.into_iter()
.map(|filter| filter.filter)
.collect::<Vec<_>>();
let remote_dyn_filter_pushdown = capture_remote_dyn_filters_for_pushdown(
self.remote_dyn_filter_producer_scope_id,
parent_filters,
);
let Some(remote_dyn_filter_producer_id) = self.remote_dyn_filter_producer_id else {
// RDF is an optimization: missing producer identity must only disable RDF, never block
// normal query execution or remove the parent-side filter fallback.
common_telemetry::warn!(
"MergeScan remote dynamic filter producer id is not assigned; skipping remote dynamic filter pushdown"
);
self.captured_remote_dyn_filters.lock().unwrap().clear();
let new_self = Arc::new(self.clone());
return Ok(FilterPushdownPropagation {
filters: parent_filters.into_iter().map(|_| PushedDown::No).collect(),
updated_node: Some(new_self),
});
};
let remote_dyn_filter_pushdown =
capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters);
*self.captured_remote_dyn_filters.lock().unwrap() =
remote_dyn_filter_pushdown.captured_dyn_filters;
let new_self = Arc::new(self.clone());
@@ -881,7 +905,6 @@ mod tests {
let registry_manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
registry_manager.get_or_init(query_id);
let first = registry_manager.acquire_lease(query_id);
let second = registry_manager.acquire_lease(query_id);
@@ -939,8 +962,8 @@ mod tests {
}
#[test]
fn try_with_new_distribution_preserves_producer_scope_id() {
let producer_scope_id = ProducerScopeId::new(42);
fn try_with_new_distribution_preserves_remote_dyn_filter_producer_id() {
let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
// Build a plan whose schema contains "col1"
let plan = LogicalPlanBuilder::empty(true)
@@ -976,13 +999,13 @@ mod tests {
query_ctx,
target_partition,
partition_cols,
producer_scope_id,
Some(remote_dyn_filter_producer_id),
)
.unwrap();
assert_eq!(
exec.remote_dyn_filter_producer_scope_id(),
producer_scope_id
exec.remote_dyn_filter_producer_id(),
Some(remote_dyn_filter_producer_id)
);
// A distribution that differs from the current partitioning but shares a
@@ -998,15 +1021,15 @@ mod tests {
.expect("expected a cloned exec with overlapping partition col");
assert_eq!(
cloned.remote_dyn_filter_producer_scope_id(),
producer_scope_id,
"try_with_new_distribution must preserve producer scope id"
cloned.remote_dyn_filter_producer_id(),
Some(remote_dyn_filter_producer_id),
"try_with_new_distribution must preserve remote dynamic filter producer id"
);
}
#[test]
fn remote_dyn_filter_preflight_keeps_parent_filter_until_dn_runtime_is_ready() {
let producer_scope_id = ProducerScopeId::new(42);
let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
let plan = LogicalPlanBuilder::empty(true)
.project(vec![lit(1i32).alias("col1")])
.unwrap()
@@ -1029,7 +1052,7 @@ mod tests {
query_ctx,
1,
AliasMapping::new(),
producer_scope_id,
Some(remote_dyn_filter_producer_id),
)
.unwrap();
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(

View File

@@ -178,7 +178,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
query_ctx,
session_state.config().target_partitions(),
merge_scan.partition_cols().clone(),
merge_scan.producer_scope_id(),
merge_scan.remote_dyn_filter_producer_id(),
)?;
Ok(Some(Arc::new(merge_scan_plan) as _))
}

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock, Weak};
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
@@ -23,7 +22,7 @@ use store_api::storage::RegionId;
use crate::dist_plan::FilterId;
/// Routing metadata for a remote dynamic filter subscriber.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Subscriber {
region_id: RegionId,
}
@@ -42,6 +41,7 @@ impl Subscriber {
#[derive(Debug, Clone)]
pub enum EntryRegistration {
Inserted(Arc<DynFilterEntry>),
/// The filter already existed; this contains the previously registered entry.
Existing(Arc<DynFilterEntry>),
}
@@ -60,7 +60,7 @@ pub enum SubscriberRegistration {
pub struct DynFilterEntry {
filter_id: FilterId,
alive_dyn_filter: Weak<DynamicFilterPhysicalExpr>,
subscribers: RwLock<Vec<Subscriber>>,
subscribers: RwLock<HashSet<Subscriber>>,
}
#[derive(Debug)]
@@ -73,7 +73,7 @@ impl DynFilterEntry {
Self {
filter_id,
alive_dyn_filter: Arc::downgrade(&alive_dyn_filter),
subscribers: RwLock::new(Vec::new()),
subscribers: RwLock::new(HashSet::new()),
}
}
@@ -86,17 +86,12 @@ impl DynFilterEntry {
}
pub fn subscribers(&self) -> Vec<Subscriber> {
self.subscribers.read().unwrap().clone()
self.subscribers.read().unwrap().iter().cloned().collect()
}
pub fn register_subscriber(&self, subscriber: Subscriber) -> bool {
let mut subscribers = self.subscribers.write().unwrap();
if subscribers.contains(&subscriber) {
return false;
}
subscribers.push(subscriber);
true
subscribers.insert(subscriber)
}
}
@@ -104,7 +99,6 @@ impl DynFilterEntry {
#[derive(Debug)]
pub struct QueryDynFilterRegistry {
query_id: QueryId,
active_streams: AtomicUsize,
inner: RwLock<QueryDynFilterRegistryInner>,
}
@@ -112,7 +106,6 @@ impl QueryDynFilterRegistry {
pub fn new(query_id: QueryId) -> Self {
Self {
query_id,
active_streams: AtomicUsize::new(0),
inner: RwLock::new(QueryDynFilterRegistryInner {
entries: HashMap::new(),
}),
@@ -123,18 +116,6 @@ impl QueryDynFilterRegistry {
self.query_id
}
fn acquire_stream(&self) {
self.active_streams.fetch_add(1, Ordering::SeqCst);
}
fn release_stream(&self) {
self.active_streams.fetch_sub(1, Ordering::SeqCst);
}
fn active_stream_count(&self) -> usize {
self.active_streams.load(Ordering::SeqCst)
}
pub fn entry_count(&self) -> usize {
self.inner.read().unwrap().entries.len()
}
@@ -185,10 +166,20 @@ impl QueryDynFilterRegistry {
}
}
/// Stream-scoped lease that keeps a query registry alive.
///
/// The manager only stores a weak index. Holding this lease is the production path for owning a
/// strong registry reference; this keeps cleanup tied to stream lifetime instead of a hand-rolled
/// active-stream counter.
#[derive(Debug)]
pub struct RemoteDynFilterRegistryLease {
registry_manager: Arc<DynFilterRegistryManager>,
registry: Arc<QueryDynFilterRegistry>,
/// Always `Some` while the lease is alive.
///
/// This is wrapped in `Option` only so `Drop` can `take()` and release the strong `Arc` before
/// checking whether the manager's weak index became dead. Without this explicit drop order,
/// the field would remain alive until after `Drop::drop` returns.
registry: Option<Arc<QueryDynFilterRegistry>>,
}
impl RemoteDynFilterRegistryLease {
@@ -196,80 +187,167 @@ impl RemoteDynFilterRegistryLease {
registry_manager: Arc<DynFilterRegistryManager>,
registry: Arc<QueryDynFilterRegistry>,
) -> Self {
registry.acquire_stream();
Self {
registry_manager,
registry,
registry: Some(registry),
}
}
pub fn registry(&self) -> &QueryDynFilterRegistry {
self.registry
.as_deref()
.expect("remote dyn filter registry lease must hold a registry")
}
#[cfg(test)]
pub(crate) fn ptr_eq(&self, other: &Self) -> bool {
Arc::ptr_eq(
self.registry.as_ref().unwrap(),
other.registry.as_ref().unwrap(),
)
}
}
impl Drop for RemoteDynFilterRegistryLease {
fn drop(&mut self) {
self.registry.release_stream();
let Some(registry) = self.registry.take() else {
return;
};
let query_id = registry.query_id();
let registry_weak = Arc::downgrade(&registry);
// Release this lease's strong reference before checking whether the manager's weak entry is
// dead. If two leases drop concurrently, checking `Arc::strong_count` before the fields are
// dropped can make both drops observe each other and both skip cleanup.
drop(registry);
let _ = self
.registry_manager
.remove_if_inactive(&self.registry.query_id(), &self.registry);
.remove_if_dropped_registry(&query_id, &registry_weak);
}
}
/// Query-engine manager for query-scoped remote dynamic filter registries.
///
/// This is an index, not an owner: the map stores `Weak` pointers and active streams own the
/// registry through [`RemoteDynFilterRegistryLease`]. Keep production access lease-based so
/// dropping a lease releases one strong owner before the manager prunes a dead weak entry.
#[derive(Debug, Default)]
pub struct DynFilterRegistryManager {
registries: RwLock<HashMap<QueryId, Arc<QueryDynFilterRegistry>>>,
registries: RwLock<HashMap<QueryId, Weak<QueryDynFilterRegistry>>>,
}
impl DynFilterRegistryManager {
pub fn get(&self, query_id: &QueryId) -> Option<Arc<QueryDynFilterRegistry>> {
self.registries.read().unwrap().get(query_id).cloned()
#[cfg(test)]
fn get(&self, query_id: &QueryId) -> Option<Arc<QueryDynFilterRegistry>> {
let (registry, stale_entry) = {
let registries = self.registries.read().unwrap();
let registry = registries.get(query_id)?;
(registry.upgrade(), registry.clone())
};
if registry.is_none() {
self.remove_stale_entry(query_id, &stale_entry);
}
registry
}
#[cfg(test)]
fn remove(&self, query_id: &QueryId) -> Option<Arc<QueryDynFilterRegistry>> {
fn remove(&self, query_id: &QueryId) -> Option<Weak<QueryDynFilterRegistry>> {
self.registries.write().unwrap().remove(query_id)
}
fn remove_if_inactive(
fn remove_if_dropped_registry(
&self,
query_id: &QueryId,
registry: &Arc<QueryDynFilterRegistry>,
) -> Option<Arc<QueryDynFilterRegistry>> {
let mut registries = self.registries.write().unwrap();
dropped_registry: &Weak<QueryDynFilterRegistry>,
) -> Option<Weak<QueryDynFilterRegistry>> {
let mut registries = self
.registries
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let current = registries.get(query_id)?;
if Arc::ptr_eq(current, registry) && registry.active_stream_count() == 0 {
// Compare `Weak` handles rather than raw pointers. The weak control block stays alive while
// either this drop's local weak handle or the manager's weak entry exists. `ptr_eq` prevents
// an old lease drop from removing a different registry installed for the same query id, and
// `upgrade().is_none()` ensures we only prune dead entries.
if current.ptr_eq(dropped_registry) && current.upgrade().is_none() {
registries.remove(query_id)
} else {
None
}
}
#[cfg(test)]
fn remove_stale_entry(
&self,
query_id: &QueryId,
stale_registry: &Weak<QueryDynFilterRegistry>,
) {
let mut registries = self.registries.write().unwrap();
let Some(current) = registries.get(query_id) else {
return;
};
if current.ptr_eq(stale_registry) && current.upgrade().is_none() {
registries.remove(query_id);
}
}
/// Acquires the stream-owned registry lease for `query_id`.
///
/// This is the only production API that returns a live registry handle. A concurrent final
/// lease drop cannot remove the map entry between lookup and ownership transfer because the
/// upgraded `Arc` returned by `get_or_init` is already a strong owner before the manager lock is
/// released.
pub fn acquire_lease(self: &Arc<Self>, query_id: QueryId) -> RemoteDynFilterRegistryLease {
let registry = self.get_or_init(query_id);
RemoteDynFilterRegistryLease::new(self.clone(), registry)
}
pub fn get_or_init(&self, query_id: QueryId) -> Arc<QueryDynFilterRegistry> {
fn get_or_init(&self, query_id: QueryId) -> Arc<QueryDynFilterRegistry> {
let mut registries = self.registries.write().unwrap();
registries
.entry(query_id)
.or_insert_with(|| Arc::new(QueryDynFilterRegistry::new(query_id)))
.clone()
if let Some(registry) = registries.get(&query_id).and_then(Weak::upgrade) {
return registry;
}
let registry = Arc::new(QueryDynFilterRegistry::new(query_id));
registries.insert(query_id, Arc::downgrade(&registry));
registry
}
#[cfg(test)]
pub fn registry_count(&self) -> usize {
// Snapshot helper for tests. Lifecycle decisions must not depend on this count; cleanup uses
// the lease-owned `Arc` and weak-entry pruning instead.
self.registries
.read()
.unwrap()
.values()
.filter(|registry| registry.strong_count() > 0)
.count()
}
#[cfg(test)]
fn weak_entry_count(&self) -> usize {
self.registries.read().unwrap().len()
}
}
#[cfg(test)]
mod tests {
use std::sync::Barrier;
use std::thread;
use datafusion_physical_expr::expressions::{Column, lit};
use uuid::Uuid;
use super::*;
use crate::dist_plan::{FilterFingerprint, ProducerScopeId};
use crate::dist_plan::{FilterFingerprint, RemoteDynFilterProducerId};
fn test_query_id(value: u128) -> QueryId {
QueryId::from(Uuid::from_u128(value))
@@ -277,7 +355,7 @@ mod tests {
fn test_filter_id(producer_ordinal: u32) -> FilterId {
FilterId::new(
ProducerScopeId::new(42),
RemoteDynFilterProducerId::new(42),
producer_ordinal,
FilterFingerprint::new(0xabc),
)
@@ -295,25 +373,32 @@ mod tests {
#[test]
fn registry_manager_returns_same_registry_for_same_query() {
let manager = DynFilterRegistryManager::default();
let manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
let first = manager.get_or_init(query_id);
let second = manager.get_or_init(query_id);
let first = manager.acquire_lease(query_id);
let second = manager.acquire_lease(query_id);
assert!(Arc::ptr_eq(&first, &second));
assert!(first.ptr_eq(&second));
assert_eq!(manager.registry_count(), 1);
assert_eq!(manager.weak_entry_count(), 1);
}
#[test]
fn registry_manager_removes_registry_for_query() {
let manager = DynFilterRegistryManager::default();
let manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
let registry = manager.get_or_init(query_id);
let lease = manager.acquire_lease(query_id);
assert!(Arc::ptr_eq(&manager.remove(&query_id).unwrap(), &registry));
assert!(
manager
.remove(&query_id)
.unwrap()
.ptr_eq(&Arc::downgrade(lease.registry.as_ref().unwrap()))
);
assert!(manager.get(&query_id).is_none());
assert_eq!(manager.registry_count(), 0);
assert_eq!(manager.weak_entry_count(), 0);
}
#[test]
@@ -324,12 +409,16 @@ mod tests {
let first = manager.acquire_lease(query_id);
let second = manager.acquire_lease(query_id);
assert!(first.ptr_eq(&second));
assert_eq!(manager.registry_count(), 1);
assert_eq!(manager.weak_entry_count(), 1);
drop(first);
assert_eq!(manager.registry_count(), 1);
assert_eq!(manager.weak_entry_count(), 1);
drop(second);
assert_eq!(manager.registry_count(), 0);
assert_eq!(manager.weak_entry_count(), 0);
}
#[test]
@@ -339,11 +428,141 @@ mod tests {
let first = manager.acquire_lease(query_id);
drop(first);
assert_eq!(manager.registry_count(), 0);
assert_eq!(manager.weak_entry_count(), 0);
let second = manager.acquire_lease(query_id);
assert_eq!(manager.registry_count(), 1);
assert_eq!(manager.weak_entry_count(), 1);
drop(second);
assert_eq!(manager.registry_count(), 0);
assert_eq!(manager.weak_entry_count(), 0);
}
#[test]
fn registry_manager_concurrent_final_lease_drop_cleans_weak_entry() {
let manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
let first = manager.acquire_lease(query_id);
let second = manager.acquire_lease(query_id);
let barrier = Arc::new(Barrier::new(3));
let first_barrier = barrier.clone();
let first_drop = thread::spawn(move || {
first_barrier.wait();
drop(first);
});
let second_barrier = barrier.clone();
let second_drop = thread::spawn(move || {
second_barrier.wait();
drop(second);
});
barrier.wait();
first_drop.join().unwrap();
second_drop.join().unwrap();
assert_eq!(manager.registry_count(), 0);
assert_eq!(manager.weak_entry_count(), 0);
}
#[test]
fn registry_manager_concurrent_first_acquire_shares_registry() {
let manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
let worker_count = 8;
let barrier = Arc::new(Barrier::new(worker_count + 1));
let handles = (0..worker_count)
.map(|_| {
let manager = manager.clone();
let barrier = barrier.clone();
thread::spawn(move || {
barrier.wait();
manager.acquire_lease(query_id)
})
})
.collect::<Vec<_>>();
barrier.wait();
let leases = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect::<Vec<_>>();
let first = leases.first().unwrap();
assert!(leases.iter().all(|lease| first.ptr_eq(lease)));
assert_eq!(manager.registry_count(), 1);
assert_eq!(manager.weak_entry_count(), 1);
drop(leases);
assert_eq!(manager.registry_count(), 0);
assert_eq!(manager.weak_entry_count(), 0);
}
#[test]
fn registry_manager_drop_racing_acquire_does_not_leave_stale_entry() {
let manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
for _ in 0..64 {
let old_lease = manager.acquire_lease(query_id);
let barrier = Arc::new(Barrier::new(3));
let drop_barrier = barrier.clone();
let drop_thread = thread::spawn(move || {
drop_barrier.wait();
drop(old_lease);
});
let acquire_manager = manager.clone();
let acquire_barrier = barrier.clone();
let acquire_thread = thread::spawn(move || {
acquire_barrier.wait();
acquire_manager.acquire_lease(query_id)
});
barrier.wait();
drop_thread.join().unwrap();
let new_lease = acquire_thread.join().unwrap();
assert_eq!(manager.registry_count(), 1);
assert_eq!(manager.weak_entry_count(), 1);
drop(new_lease);
assert_eq!(manager.registry_count(), 0);
assert_eq!(manager.weak_entry_count(), 0);
}
}
#[test]
fn registry_manager_old_drop_cannot_remove_replacement_registry() {
let manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
let old_lease = manager.acquire_lease(query_id);
let old_registry = Arc::downgrade(old_lease.registry.as_ref().unwrap());
drop(old_lease);
assert_eq!(manager.registry_count(), 0);
assert_eq!(manager.weak_entry_count(), 0);
let replacement_lease = manager.acquire_lease(query_id);
assert_eq!(manager.registry_count(), 1);
assert_eq!(manager.weak_entry_count(), 1);
assert!(
manager
.remove_if_dropped_registry(&query_id, &old_registry)
.is_none(),
"old registry cleanup must not remove the replacement weak entry"
);
assert_eq!(manager.registry_count(), 1);
assert_eq!(manager.weak_entry_count(), 1);
drop(replacement_lease);
assert_eq!(manager.registry_count(), 0);
assert_eq!(manager.weak_entry_count(), 0);
}
#[test]

View File

@@ -395,7 +395,7 @@ mod tests {
use table::table_name::TableName;
use super::*;
use crate::dist_plan::ProducerScopeId;
use crate::dist_plan::RemoteDynFilterProducerId;
use crate::options::{FLOW_RETURN_REGION_SEQ, FLOW_SINK_TABLE_ID};
use crate::region_query::RegionQueryHandler;
@@ -459,7 +459,7 @@ mod tests {
query_ctx,
1,
BTreeMap::<String, BTreeSet<datafusion_common::Column>>::new(),
ProducerScopeId::new(0),
Some(RemoteDynFilterProducerId::new(0)),
)
.unwrap(),
)

View File

@@ -154,7 +154,7 @@ mod tests {
use table::table_name::TableName;
use super::*;
use crate::dist_plan::ProducerScopeId;
use crate::dist_plan::RemoteDynFilterProducerId;
use crate::error::Result as QueryResult;
use crate::region_query::RegionQueryHandler;
@@ -292,7 +292,7 @@ mod tests {
QueryContext::arc(),
32,
partition_cols,
ProducerScopeId::new(1),
Some(RemoteDynFilterProducerId::new(1)),
)
.unwrap()
}

View File

@@ -57,7 +57,7 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::QueryEngineContext;
use crate::dist_plan::{
DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, DynFilterRegistryManager,
MergeSortExtensionPlanner, QueryDynFilterRegistry,
MergeSortExtensionPlanner, RemoteDynFilterRegistryLease,
};
use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
use crate::optimizer::ExtensionAnalyzerRule;
@@ -404,12 +404,16 @@ impl QueryEngineState {
self.dyn_filter_registry_manager.clone()
}
pub fn get_or_init_remote_dyn_filter_registry(
pub fn acquire_remote_dyn_filter_registry_lease(
&self,
query_ctx: &QueryContextRef,
) -> Option<Arc<QueryDynFilterRegistry>> {
) -> Option<RemoteDynFilterRegistryLease> {
let query_id = query_ctx.remote_query_id_value()?;
Some(self.dyn_filter_registry_manager.get_or_init(query_id))
Some(
self.dyn_filter_registry_manager
.clone()
.acquire_lease(query_id),
)
}
pub fn function_state(&self) -> Arc<FunctionState> {
@@ -617,20 +621,23 @@ mod tests {
}
#[test]
fn query_engine_state_reuses_query_scoped_dyn_filter_registry() {
fn query_engine_state_reuses_query_scoped_dyn_filter_registry_lease() {
let state = new_query_engine_state();
let query_ctx = QueryContext::arc();
let first = state
.get_or_init_remote_dyn_filter_registry(&query_ctx)
.acquire_remote_dyn_filter_registry_lease(&query_ctx)
.unwrap();
let second = state
.get_or_init_remote_dyn_filter_registry(&query_ctx)
.acquire_remote_dyn_filter_registry_lease(&query_ctx)
.unwrap();
assert!(Arc::ptr_eq(&first, &second));
assert!(first.ptr_eq(&second));
assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
assert_eq!(first.query_id(), query_ctx.remote_query_id_value().unwrap());
assert_eq!(
first.registry().query_id(),
query_ctx.remote_query_id_value().unwrap()
);
}
#[test]
@@ -640,12 +647,12 @@ mod tests {
assert!(query_ctx.remote_query_id_value().is_some());
let registry = state
.get_or_init_remote_dyn_filter_registry(&query_ctx)
let lease = state
.acquire_remote_dyn_filter_registry_lease(&query_ctx)
.unwrap();
assert_eq!(
registry.query_id(),
lease.registry().query_id(),
query_ctx.remote_query_id_value().unwrap()
);
assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
@@ -658,20 +665,20 @@ mod tests {
let second_query_ctx = QueryContext::arc();
let first = state
.get_or_init_remote_dyn_filter_registry(&first_query_ctx)
.acquire_remote_dyn_filter_registry_lease(&first_query_ctx)
.unwrap();
let second = state
.get_or_init_remote_dyn_filter_registry(&second_query_ctx)
.acquire_remote_dyn_filter_registry_lease(&second_query_ctx)
.unwrap();
assert!(!Arc::ptr_eq(&first, &second));
assert!(!first.ptr_eq(&second));
assert_eq!(state.dyn_filter_registry_manager().registry_count(), 2);
assert_eq!(
first.query_id(),
first.registry().query_id(),
first_query_ctx.remote_query_id_value().unwrap()
);
assert_eq!(
second.query_id(),
second.registry().query_id(),
second_query_ctx.remote_query_id_value().unwrap()
);
}