diff --git a/src/query/src/dist_plan/dyn_filter_bridge.rs b/src/query/src/dist_plan/dyn_filter_bridge.rs index e21805eadf..3a27fea5c5 100644 --- a/src/query/src/dist_plan/dyn_filter_bridge.rs +++ b/src/query/src/dist_plan/dyn_filter_bridge.rs @@ -84,7 +84,7 @@ pub(crate) fn register_dyn_filters_for_region( region_id: RegionId, captured_dyn_filters: &[CapturedDynFilter], ) { - for resolved_filter in resolved_dyn_filters(region_id, captured_dyn_filters) { + for resolved_filter in resolved_dyn_filters(captured_dyn_filters) { let _ = registry.register_remote_dyn_filter( resolved_filter.filter_id.clone(), resolved_filter.alive_dyn_filter, @@ -115,10 +115,7 @@ pub(crate) fn bridge_dyn_filters_for_region( register_dyn_filters_for_region(®istry, region_id, captured_dyn_filters); } -fn resolve_dyn_filter( - region_id: RegionId, - captured_dyn_filter: &CapturedDynFilter, -) -> Result { +fn resolve_dyn_filter(captured_dyn_filter: &CapturedDynFilter) -> Result { let children = captured_dyn_filter .alive_dyn_filter .children() @@ -126,7 +123,6 @@ fn resolve_dyn_filter( .cloned() .collect::>(); let filter_id = build_remote_dyn_filter_id( - region_id, captured_dyn_filter.producer_scope_id, captured_dyn_filter.producer_local_ordinal, &children, @@ -139,22 +135,18 @@ fn resolve_dyn_filter( }) } -fn resolved_dyn_filters( - region_id: RegionId, - captured_dyn_filters: &[CapturedDynFilter], -) -> Vec { +fn resolved_dyn_filters(captured_dyn_filters: &[CapturedDynFilter]) -> Vec { captured_dyn_filters .iter() - .filter_map(|captured_dyn_filter| resolve_dyn_filter(region_id, captured_dyn_filter).ok()) + .filter_map(|captured_dyn_filter| resolve_dyn_filter(captured_dyn_filter).ok()) .collect() } fn build_initial_dyn_filter_regs_for_region( - region_id: RegionId, captured_dyn_filters: &[CapturedDynFilter], ) -> InitialDynFilterRegs { InitialDynFilterRegs::new( - resolved_dyn_filters(region_id, captured_dyn_filters) + resolved_dyn_filters(captured_dyn_filters) .into_iter() .filter_map(|resolved_filter| { match InitialDynFilterReg::from_filter_id_and_children( @@ -178,7 +170,7 @@ pub(crate) fn query_context_with_initial_dyn_filter_regs( captured_dyn_filters: &[CapturedDynFilter], ) -> QueryContext { let mut region_query_ctx = query_ctx.as_ref().clone(); - let regs = build_initial_dyn_filter_regs_for_region(region_id, captured_dyn_filters); + let regs = build_initial_dyn_filter_regs_for_region(captured_dyn_filters); if regs.is_empty() { return region_query_ctx; } @@ -249,10 +241,11 @@ mod tests { lit(true) as _, )), }]; - let region_id = RegionId::new(1024, 7); + let first_region_id = RegionId::new(1024, 7); + let second_region_id = RegionId::new(1024, 8); - register_dyn_filters_for_region(®istry, region_id, &captured_dyn_filters); - register_dyn_filters_for_region(®istry, region_id, &captured_dyn_filters); + register_dyn_filters_for_region(®istry, first_region_id, &captured_dyn_filters); + register_dyn_filters_for_region(®istry, second_region_id, &captured_dyn_filters); assert_eq!(registry.entry_count(), 1); let entry = registry.entries().pop().unwrap(); @@ -261,8 +254,18 @@ mod tests { test_producer_scope(42) ); assert_eq!(entry.filter_id().producer_ordinal(), 2); - assert_eq!(entry.subscribers().len(), 1); - assert_eq!(entry.subscribers()[0].region_id(), region_id); + let subscribers = entry.subscribers(); + assert_eq!(subscribers.len(), 2); + assert!( + subscribers + .iter() + .any(|subscriber| subscriber.region_id() == first_region_id) + ); + assert!( + subscribers + .iter() + .any(|subscriber| subscriber.region_id() == second_region_id) + ); } #[test] @@ -326,7 +329,6 @@ mod tests { ) .unwrap(); let expected_filter_id = build_remote_dyn_filter_id( - region_id, captured_dyn_filters[0].producer_scope_id, captured_dyn_filters[0].producer_local_ordinal, &[Arc::new(Column::new("host", 0)) as Arc<_>], diff --git a/src/query/src/dist_plan/filter_id.rs b/src/query/src/dist_plan/filter_id.rs index 767e5e1d7e..b16fe48ac4 100644 --- a/src/query/src/dist_plan/filter_id.rs +++ b/src/query/src/dist_plan/filter_id.rs @@ -23,7 +23,6 @@ use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; use datafusion_proto::physical_plan::to_proto::serialize_physical_expr; use prost::Message; -use store_api::storage::RegionId; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct FilterFingerprint(u64); @@ -81,7 +80,6 @@ impl FromStr for ProducerScopeId { #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct FilterId { - region_id: RegionId, producer_scope_id: ProducerScopeId, producer_ordinal: u32, children_fingerprint: FilterFingerprint, @@ -93,23 +91,17 @@ pub struct FilterId { impl FilterId { pub fn new( - region_id: RegionId, producer_scope_id: ProducerScopeId, producer_ordinal: u32, children_fingerprint: FilterFingerprint, ) -> Self { Self { - region_id, producer_scope_id, producer_ordinal, children_fingerprint, } } - pub fn region_id(&self) -> RegionId { - self.region_id - } - pub fn producer_ordinal(&self) -> u32 { self.producer_ordinal } @@ -127,11 +119,8 @@ impl Display for FilterId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{}:{}:{}:{}", - self.region_id.as_u64(), - self.producer_scope_id, - self.producer_ordinal, - self.children_fingerprint + "{}:{}:{}", + self.producer_scope_id, self.producer_ordinal, self.children_fingerprint ) } } @@ -141,12 +130,6 @@ impl FromStr for FilterId { fn from_str(s: &str) -> Result { let mut parts = s.split(':'); - let region_id = parts - .next() - .ok_or(ParseFilterIdError)? - .parse::() - .map(RegionId::from_u64) - .map_err(|_| ParseFilterIdError)?; let producer_scope_id = parts .next() .ok_or(ParseFilterIdError)? @@ -167,7 +150,6 @@ impl FromStr for FilterId { } Ok(Self::new( - region_id, producer_scope_id, producer_local_ordinal, children_fingerprint, @@ -186,15 +168,14 @@ impl Display for ParseFilterIdError { /// Builds the query-local remote dynamic filter identity. /// -/// The identity is `region_id + producer scope + producer-local ordinal + canonicalized child fingerprint`. -/// Subscriber routing details such as `partition` stay outside this key so they can remain in -/// the later fanout/subscriber map instead of splitting one shared remote filter state. +/// The identity is `producer scope + producer-local ordinal + canonicalized child fingerprint`. +/// Subscriber routing details such as `region_id` and `partition` stay outside this key so they +/// can remain in the later fanout/subscriber map instead of splitting one shared remote filter state. /// /// NOTE(remote-dyn-filter): This id is generated once on the source side and then propagated. /// Consumers should reuse the propagated `FilterId` instead of independently recomputing it from /// local state. pub(crate) fn build_remote_dyn_filter_id( - region_id: RegionId, producer_scope_id: ProducerScopeId, producer_local_ordinal: usize, children: &[Arc], @@ -205,7 +186,6 @@ pub(crate) fn build_remote_dyn_filter_id( DataFusionError::Execution("producer ordinal out of range for filter id".to_string()) })?; Ok(FilterId::new( - region_id, producer_scope_id, producer_local_ordinal, children_fingerprint, @@ -248,24 +228,20 @@ mod tests { #[test] fn filter_id_round_trips_through_string() { - let filter_id = FilterId::new( - RegionId::new(1024, 7), - ProducerScopeId::new(42), - 3, - FilterFingerprint::new(0xabc), - ); + let filter_id = FilterId::new(ProducerScopeId::new(42), 3, FilterFingerprint::new(0xabc)); let encoded = filter_id.to_string(); + assert_eq!(encoded, "000000000000002a:3:0000000000000abc"); assert_eq!(encoded.parse::().unwrap(), filter_id); } #[test] fn filter_id_rejects_malformed_strings() { assert!("".parse::().is_err()); - assert!("1024:3".parse::().is_err()); - assert!("1024:0000000000000001:3:zzzz".parse::().is_err()); + assert!("0000000000000001:3".parse::().is_err()); + assert!("0000000000000001:3:zzzz".parse::().is_err()); assert!( - "1024:0000000000000001:3:0000000000000abc:extra" + "0000000000000001:3:0000000000000abc:extra" .parse::() .is_err() ); @@ -273,22 +249,13 @@ mod tests { #[test] fn remote_dyn_filter_id_is_stable_for_equivalent_children() { - let region_id = RegionId::new(1024, 7); let producer_scope_id = ProducerScopeId::new(42); - let first = build_remote_dyn_filter_id( - region_id, - producer_scope_id, - 3, - &test_children(&["host", "pod"]), - ) - .unwrap(); - let second = build_remote_dyn_filter_id( - region_id, - producer_scope_id, - 3, - &test_children(&["host", "pod"]), - ) - .unwrap(); + let first = + build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["host", "pod"])) + .unwrap(); + let second = + build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["host", "pod"])) + .unwrap(); assert_eq!(first, second); } @@ -296,20 +263,9 @@ mod tests { #[test] fn remote_dyn_filter_id_changes_when_producer_scope_changes() { let children = test_children(&["host", "pod"]); - let baseline = build_remote_dyn_filter_id( - RegionId::new(1024, 7), - ProducerScopeId::new(42), - 3, - &children, - ) - .unwrap(); - let different_scope = build_remote_dyn_filter_id( - RegionId::new(1024, 7), - ProducerScopeId::new(43), - 3, - &children, - ) - .unwrap(); + let baseline = build_remote_dyn_filter_id(ProducerScopeId::new(42), 3, &children).unwrap(); + let different_scope = + build_remote_dyn_filter_id(ProducerScopeId::new(43), 3, &children).unwrap(); assert_ne!(baseline, different_scope); } @@ -318,37 +274,24 @@ mod tests { fn remote_dyn_filter_id_changes_when_identity_inputs_change() { let children = test_children(&["host", "pod"]); let producer_scope_id = ProducerScopeId::new(42); - let baseline = - build_remote_dyn_filter_id(RegionId::new(1024, 7), producer_scope_id, 3, &children) - .unwrap(); - let different_region = - build_remote_dyn_filter_id(RegionId::new(1024, 8), producer_scope_id, 3, &children) - .unwrap(); + let baseline = build_remote_dyn_filter_id(producer_scope_id, 3, &children).unwrap(); let different_ordinal = - build_remote_dyn_filter_id(RegionId::new(1024, 7), producer_scope_id, 4, &children) + build_remote_dyn_filter_id(producer_scope_id, 4, &children).unwrap(); + let different_children = + build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["pod", "host"])) .unwrap(); - let different_children = build_remote_dyn_filter_id( - RegionId::new(1024, 7), - producer_scope_id, - 3, - &test_children(&["pod", "host"]), - ) - .unwrap(); - assert_ne!(baseline, different_region); assert_ne!(baseline, different_ordinal); assert_ne!(baseline, different_children); } #[test] fn remote_dyn_filter_id_supports_empty_children() { - let region_id = RegionId::new(4096, 2); let producer_scope_id = ProducerScopeId::new(42); - let first = build_remote_dyn_filter_id(region_id, producer_scope_id, 1, &[]).unwrap(); - let second = build_remote_dyn_filter_id(region_id, producer_scope_id, 1, &[]).unwrap(); + let first = build_remote_dyn_filter_id(producer_scope_id, 1, &[]).unwrap(); + let second = build_remote_dyn_filter_id(producer_scope_id, 1, &[]).unwrap(); assert_eq!(first, second); - assert_eq!(first.region_id(), region_id); assert_eq!(first.producer_scope_id(), producer_scope_id); assert_eq!(first.producer_ordinal(), 1); assert_eq!(first.children_fingerprint(), second.children_fingerprint()); @@ -356,14 +299,9 @@ mod tests { #[test] fn remote_dyn_filter_id_rejects_out_of_range_producer_ordinal() { - let error = build_remote_dyn_filter_id( - RegionId::new(1024, 7), - ProducerScopeId::new(42), - usize::MAX, - &[], - ) - .unwrap_err() - .to_string(); + let error = build_remote_dyn_filter_id(ProducerScopeId::new(42), usize::MAX, &[]) + .unwrap_err() + .to_string(); assert!(error.contains("producer ordinal out of range for filter id")); } diff --git a/src/query/src/dist_plan/remote_dyn_filter_registry.rs b/src/query/src/dist_plan/remote_dyn_filter_registry.rs index 068295d212..694763a59c 100644 --- a/src/query/src/dist_plan/remote_dyn_filter_registry.rs +++ b/src/query/src/dist_plan/remote_dyn_filter_registry.rs @@ -275,9 +275,8 @@ mod tests { QueryId::from(Uuid::from_u128(value)) } - fn test_filter_id(region_id: RegionId, producer_ordinal: u32) -> FilterId { + fn test_filter_id(producer_ordinal: u32) -> FilterId { FilterId::new( - region_id, ProducerScopeId::new(42), producer_ordinal, FilterFingerprint::new(0xabc), @@ -351,7 +350,7 @@ mod tests { fn registry_stores_filter_and_deduplicates_subscribers() { let registry = QueryDynFilterRegistry::new(test_query_id(1)); let filter = test_dyn_filter(&["host"]); - let filter_id = test_filter_id(RegionId::new(1024, 7), 1); + let filter_id = test_filter_id(1); let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) { EntryRegistration::Inserted(entry) => entry, other => panic!("unexpected registration result: {other:?}"),