From 855155563b4b2ba175dfe255ab5a6996acef165a Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 13 Apr 2026 14:39:22 +0800 Subject: [PATCH] feat: filter id Signed-off-by: discord9 --- Cargo.lock | 1 + src/query/Cargo.toml | 1 + src/query/src/dist_plan/merge_scan.rs | 106 +++++++++++++++++++++++++- 3 files changed, 107 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 883da18288..4e8179d59f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11144,6 +11144,7 @@ dependencies = [ "datafusion-functions", "datafusion-optimizer", "datafusion-physical-expr", + "datafusion-proto", "datafusion-sql", "datatypes", "either", diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index b92921d29b..9d9f8b24b0 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -44,6 +44,7 @@ datafusion-expr-common.workspace = true datafusion-functions.workspace = true datafusion-optimizer.workspace = true datafusion-physical-expr.workspace = true +datafusion-proto.workspace = true datafusion-sql.workspace = true datatypes.workspace = true either.workspace = true diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 470b4d325f..79926bafae 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::hash::{DefaultHasher, Hasher}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -37,11 +38,16 @@ use datafusion::physical_plan::{ use datafusion_common::{Column as ColumnExpr, DataFusionError, Result}; use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr}; +use datafusion_physical_expr::{ + Distribution, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, +}; +use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; +use datafusion_proto::physical_plan::to_proto::serialize_physical_expr; use futures_util::StreamExt; use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; use meter_macros::read_meter; +use prost::Message; use session::context::QueryContextRef; use store_api::storage::RegionId; use table::table_name::TableName; @@ -499,6 +505,42 @@ impl MergeScanExec { } } +/// Builds the Phase 1 query-local remote dynamic filter identity. +/// +/// The identity is `region_id + producer-local ordinal + canonicalized child fingerprint`. +/// Subscriber routing details such as `partition` stay outside this key so they can remain in +/// the later fanout/subscriber map instead of splitting one shared remote filter state. +#[allow(unused)] +pub(crate) fn build_remote_dyn_filter_id( + region_id: RegionId, + producer_local_ordinal: usize, + children: &[Arc], +) -> Result { + let children_fingerprint = canonicalize_dyn_filter_children(children)?; + Ok(format!( + "{region_id}:{producer_local_ordinal}:{children_fingerprint}" + )) +} + +fn canonicalize_dyn_filter_children(children: &[Arc]) -> Result { + let codec = DefaultPhysicalExtensionCodec {}; + let mut encoded_children = Vec::with_capacity(children.len()); + + for child in children { + let proto = serialize_physical_expr(child, &codec)?; + let mut bytes = Vec::new(); + proto + .encode(&mut bytes) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let mut hasher = DefaultHasher::new(); + hasher.write_usize(bytes.len()); + hasher.write(&bytes); + encoded_children.push(format!("{:016x}", hasher.finish())); + } + + Ok(encoded_children.join(",")) +} + /// Metrics for a region of a partition. #[derive(Debug, Clone)] struct RegionMetrics { @@ -713,3 +755,65 @@ impl MergeScanMetric { self.greptime_exec_cost.add(metrics); } } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_children(names: &[&str]) -> Vec> { + names + .iter() + .enumerate() + .map(|(index, name)| Arc::new(Column::new(*name, index)) as Arc) + .collect() + } + + #[test] + fn remote_dyn_filter_id_is_stable_for_equivalent_children() { + let region_id = RegionId::new(1024, 7); + let first = + build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap(); + let second = + build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap(); + + assert_eq!(first, second); + } + + #[test] + fn remote_dyn_filter_id_changes_when_identity_inputs_change() { + let children = test_children(&["host", "pod"]); + let baseline = build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &children).unwrap(); + let different_region = + build_remote_dyn_filter_id(RegionId::new(1024, 8), 3, &children).unwrap(); + let different_ordinal = + build_remote_dyn_filter_id(RegionId::new(1024, 7), 4, &children).unwrap(); + let different_children = + build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &test_children(&["pod", "host"])) + .unwrap(); + + assert_ne!(baseline, different_region); + assert_ne!(baseline, different_ordinal); + assert_ne!(baseline, different_children); + } + + #[test] + fn remote_dyn_filter_id_has_no_partition_dimension() { + let children = test_children(&["host"]); + let partition_zero = + build_remote_dyn_filter_id(RegionId::new(2048, 1), 0, &children).unwrap(); + let partition_one = + build_remote_dyn_filter_id(RegionId::new(2048, 1), 0, &children).unwrap(); + + assert_eq!(partition_zero, partition_one); + } + + #[test] + fn remote_dyn_filter_id_supports_empty_children() { + let region_id = RegionId::new(4096, 2); + let first = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap(); + let second = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap(); + + assert_eq!(first, second); + assert_eq!(first, format!("{region_id}:1:")); + } +}