fix: exclude region from remote dyn filter id

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-01 15:22:07 +08:00
parent 15b1d2043e
commit 10e26ba07e
3 changed files with 52 additions and 113 deletions

View File

@@ -84,7 +84,7 @@ pub(crate) fn register_dyn_filters_for_region(
region_id: RegionId,
captured_dyn_filters: &[CapturedDynFilter],
) {
for resolved_filter in resolved_dyn_filters(region_id, captured_dyn_filters) {
for resolved_filter in resolved_dyn_filters(captured_dyn_filters) {
let _ = registry.register_remote_dyn_filter(
resolved_filter.filter_id.clone(),
resolved_filter.alive_dyn_filter,
@@ -115,10 +115,7 @@ pub(crate) fn bridge_dyn_filters_for_region(
register_dyn_filters_for_region(&registry, region_id, captured_dyn_filters);
}
fn resolve_dyn_filter(
region_id: RegionId,
captured_dyn_filter: &CapturedDynFilter,
) -> Result<ResolvedDynFilter> {
fn resolve_dyn_filter(captured_dyn_filter: &CapturedDynFilter) -> Result<ResolvedDynFilter> {
let children = captured_dyn_filter
.alive_dyn_filter
.children()
@@ -126,7 +123,6 @@ fn resolve_dyn_filter(
.cloned()
.collect::<Vec<_>>();
let filter_id = build_remote_dyn_filter_id(
region_id,
captured_dyn_filter.producer_scope_id,
captured_dyn_filter.producer_local_ordinal,
&children,
@@ -139,22 +135,18 @@ fn resolve_dyn_filter(
})
}
fn resolved_dyn_filters(
region_id: RegionId,
captured_dyn_filters: &[CapturedDynFilter],
) -> Vec<ResolvedDynFilter> {
fn resolved_dyn_filters(captured_dyn_filters: &[CapturedDynFilter]) -> Vec<ResolvedDynFilter> {
captured_dyn_filters
.iter()
.filter_map(|captured_dyn_filter| resolve_dyn_filter(region_id, captured_dyn_filter).ok())
.filter_map(|captured_dyn_filter| resolve_dyn_filter(captured_dyn_filter).ok())
.collect()
}
fn build_initial_dyn_filter_regs_for_region(
region_id: RegionId,
captured_dyn_filters: &[CapturedDynFilter],
) -> InitialDynFilterRegs {
InitialDynFilterRegs::new(
resolved_dyn_filters(region_id, captured_dyn_filters)
resolved_dyn_filters(captured_dyn_filters)
.into_iter()
.filter_map(|resolved_filter| {
match InitialDynFilterReg::from_filter_id_and_children(
@@ -178,7 +170,7 @@ pub(crate) fn query_context_with_initial_dyn_filter_regs(
captured_dyn_filters: &[CapturedDynFilter],
) -> QueryContext {
let mut region_query_ctx = query_ctx.as_ref().clone();
let regs = build_initial_dyn_filter_regs_for_region(region_id, captured_dyn_filters);
let regs = build_initial_dyn_filter_regs_for_region(captured_dyn_filters);
if regs.is_empty() {
return region_query_ctx;
}
@@ -249,10 +241,11 @@ mod tests {
lit(true) as _,
)),
}];
let region_id = RegionId::new(1024, 7);
let first_region_id = RegionId::new(1024, 7);
let second_region_id = RegionId::new(1024, 8);
register_dyn_filters_for_region(&registry, region_id, &captured_dyn_filters);
register_dyn_filters_for_region(&registry, region_id, &captured_dyn_filters);
register_dyn_filters_for_region(&registry, first_region_id, &captured_dyn_filters);
register_dyn_filters_for_region(&registry, second_region_id, &captured_dyn_filters);
assert_eq!(registry.entry_count(), 1);
let entry = registry.entries().pop().unwrap();
@@ -261,8 +254,18 @@ mod tests {
test_producer_scope(42)
);
assert_eq!(entry.filter_id().producer_ordinal(), 2);
assert_eq!(entry.subscribers().len(), 1);
assert_eq!(entry.subscribers()[0].region_id(), region_id);
let subscribers = entry.subscribers();
assert_eq!(subscribers.len(), 2);
assert!(
subscribers
.iter()
.any(|subscriber| subscriber.region_id() == first_region_id)
);
assert!(
subscribers
.iter()
.any(|subscriber| subscriber.region_id() == second_region_id)
);
}
#[test]
@@ -326,7 +329,6 @@ mod tests {
)
.unwrap();
let expected_filter_id = build_remote_dyn_filter_id(
region_id,
captured_dyn_filters[0].producer_scope_id,
captured_dyn_filters[0].producer_local_ordinal,
&[Arc::new(Column::new("host", 0)) as Arc<_>],

View File

@@ -23,7 +23,6 @@ use datafusion_physical_expr::PhysicalExpr;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
use prost::Message;
use store_api::storage::RegionId;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct FilterFingerprint(u64);
@@ -81,7 +80,6 @@ impl FromStr for ProducerScopeId {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct FilterId {
region_id: RegionId,
producer_scope_id: ProducerScopeId,
producer_ordinal: u32,
children_fingerprint: FilterFingerprint,
@@ -93,23 +91,17 @@ pub struct FilterId {
impl FilterId {
pub fn new(
region_id: RegionId,
producer_scope_id: ProducerScopeId,
producer_ordinal: u32,
children_fingerprint: FilterFingerprint,
) -> Self {
Self {
region_id,
producer_scope_id,
producer_ordinal,
children_fingerprint,
}
}
pub fn region_id(&self) -> RegionId {
self.region_id
}
pub fn producer_ordinal(&self) -> u32 {
self.producer_ordinal
}
@@ -127,11 +119,8 @@ impl Display for FilterId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}:{}:{}",
self.region_id.as_u64(),
self.producer_scope_id,
self.producer_ordinal,
self.children_fingerprint
"{}:{}:{}",
self.producer_scope_id, self.producer_ordinal, self.children_fingerprint
)
}
}
@@ -141,12 +130,6 @@ impl FromStr for FilterId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.split(':');
let region_id = parts
.next()
.ok_or(ParseFilterIdError)?
.parse::<u64>()
.map(RegionId::from_u64)
.map_err(|_| ParseFilterIdError)?;
let producer_scope_id = parts
.next()
.ok_or(ParseFilterIdError)?
@@ -167,7 +150,6 @@ impl FromStr for FilterId {
}
Ok(Self::new(
region_id,
producer_scope_id,
producer_local_ordinal,
children_fingerprint,
@@ -186,15 +168,14 @@ impl Display for ParseFilterIdError {
/// Builds the query-local remote dynamic filter identity.
///
/// The identity is `region_id + producer scope + producer-local ordinal + canonicalized child fingerprint`.
/// Subscriber routing details such as `partition` stay outside this key so they can remain in
/// the later fanout/subscriber map instead of splitting one shared remote filter state.
/// The identity is `producer scope + producer-local ordinal + canonicalized child fingerprint`.
/// Subscriber routing details such as `region_id` and `partition` stay outside this key so they
/// can remain in the later fanout/subscriber map instead of splitting one shared remote filter state.
///
/// NOTE(remote-dyn-filter): This id is generated once on the source side and then propagated.
/// Consumers should reuse the propagated `FilterId` instead of independently recomputing it from
/// local state.
pub(crate) fn build_remote_dyn_filter_id(
region_id: RegionId,
producer_scope_id: ProducerScopeId,
producer_local_ordinal: usize,
children: &[Arc<dyn PhysicalExpr>],
@@ -205,7 +186,6 @@ pub(crate) fn build_remote_dyn_filter_id(
DataFusionError::Execution("producer ordinal out of range for filter id".to_string())
})?;
Ok(FilterId::new(
region_id,
producer_scope_id,
producer_local_ordinal,
children_fingerprint,
@@ -248,24 +228,20 @@ mod tests {
#[test]
fn filter_id_round_trips_through_string() {
let filter_id = FilterId::new(
RegionId::new(1024, 7),
ProducerScopeId::new(42),
3,
FilterFingerprint::new(0xabc),
);
let filter_id = FilterId::new(ProducerScopeId::new(42), 3, FilterFingerprint::new(0xabc));
let encoded = filter_id.to_string();
assert_eq!(encoded, "000000000000002a:3:0000000000000abc");
assert_eq!(encoded.parse::<FilterId>().unwrap(), filter_id);
}
#[test]
fn filter_id_rejects_malformed_strings() {
assert!("".parse::<FilterId>().is_err());
assert!("1024:3".parse::<FilterId>().is_err());
assert!("1024:0000000000000001:3:zzzz".parse::<FilterId>().is_err());
assert!("0000000000000001:3".parse::<FilterId>().is_err());
assert!("0000000000000001:3:zzzz".parse::<FilterId>().is_err());
assert!(
"1024:0000000000000001:3:0000000000000abc:extra"
"0000000000000001:3:0000000000000abc:extra"
.parse::<FilterId>()
.is_err()
);
@@ -273,22 +249,13 @@ mod tests {
#[test]
fn remote_dyn_filter_id_is_stable_for_equivalent_children() {
let region_id = RegionId::new(1024, 7);
let producer_scope_id = ProducerScopeId::new(42);
let first = build_remote_dyn_filter_id(
region_id,
producer_scope_id,
3,
&test_children(&["host", "pod"]),
)
.unwrap();
let second = build_remote_dyn_filter_id(
region_id,
producer_scope_id,
3,
&test_children(&["host", "pod"]),
)
.unwrap();
let first =
build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["host", "pod"]))
.unwrap();
let second =
build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["host", "pod"]))
.unwrap();
assert_eq!(first, second);
}
@@ -296,20 +263,9 @@ mod tests {
#[test]
fn remote_dyn_filter_id_changes_when_producer_scope_changes() {
let children = test_children(&["host", "pod"]);
let baseline = build_remote_dyn_filter_id(
RegionId::new(1024, 7),
ProducerScopeId::new(42),
3,
&children,
)
.unwrap();
let different_scope = build_remote_dyn_filter_id(
RegionId::new(1024, 7),
ProducerScopeId::new(43),
3,
&children,
)
.unwrap();
let baseline = build_remote_dyn_filter_id(ProducerScopeId::new(42), 3, &children).unwrap();
let different_scope =
build_remote_dyn_filter_id(ProducerScopeId::new(43), 3, &children).unwrap();
assert_ne!(baseline, different_scope);
}
@@ -318,37 +274,24 @@ mod tests {
fn remote_dyn_filter_id_changes_when_identity_inputs_change() {
let children = test_children(&["host", "pod"]);
let producer_scope_id = ProducerScopeId::new(42);
let baseline =
build_remote_dyn_filter_id(RegionId::new(1024, 7), producer_scope_id, 3, &children)
.unwrap();
let different_region =
build_remote_dyn_filter_id(RegionId::new(1024, 8), producer_scope_id, 3, &children)
.unwrap();
let baseline = build_remote_dyn_filter_id(producer_scope_id, 3, &children).unwrap();
let different_ordinal =
build_remote_dyn_filter_id(RegionId::new(1024, 7), producer_scope_id, 4, &children)
build_remote_dyn_filter_id(producer_scope_id, 4, &children).unwrap();
let different_children =
build_remote_dyn_filter_id(producer_scope_id, 3, &test_children(&["pod", "host"]))
.unwrap();
let different_children = build_remote_dyn_filter_id(
RegionId::new(1024, 7),
producer_scope_id,
3,
&test_children(&["pod", "host"]),
)
.unwrap();
assert_ne!(baseline, different_region);
assert_ne!(baseline, different_ordinal);
assert_ne!(baseline, different_children);
}
#[test]
fn remote_dyn_filter_id_supports_empty_children() {
let region_id = RegionId::new(4096, 2);
let producer_scope_id = ProducerScopeId::new(42);
let first = build_remote_dyn_filter_id(region_id, producer_scope_id, 1, &[]).unwrap();
let second = build_remote_dyn_filter_id(region_id, producer_scope_id, 1, &[]).unwrap();
let first = build_remote_dyn_filter_id(producer_scope_id, 1, &[]).unwrap();
let second = build_remote_dyn_filter_id(producer_scope_id, 1, &[]).unwrap();
assert_eq!(first, second);
assert_eq!(first.region_id(), region_id);
assert_eq!(first.producer_scope_id(), producer_scope_id);
assert_eq!(first.producer_ordinal(), 1);
assert_eq!(first.children_fingerprint(), second.children_fingerprint());
@@ -356,14 +299,9 @@ mod tests {
#[test]
fn remote_dyn_filter_id_rejects_out_of_range_producer_ordinal() {
let error = build_remote_dyn_filter_id(
RegionId::new(1024, 7),
ProducerScopeId::new(42),
usize::MAX,
&[],
)
.unwrap_err()
.to_string();
let error = build_remote_dyn_filter_id(ProducerScopeId::new(42), usize::MAX, &[])
.unwrap_err()
.to_string();
assert!(error.contains("producer ordinal out of range for filter id"));
}

View File

@@ -275,9 +275,8 @@ mod tests {
QueryId::from(Uuid::from_u128(value))
}
fn test_filter_id(region_id: RegionId, producer_ordinal: u32) -> FilterId {
fn test_filter_id(producer_ordinal: u32) -> FilterId {
FilterId::new(
region_id,
ProducerScopeId::new(42),
producer_ordinal,
FilterFingerprint::new(0xabc),
@@ -351,7 +350,7 @@ mod tests {
fn registry_stores_filter_and_deduplicates_subscribers() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
let filter = test_dyn_filter(&["host"]);
let filter_id = test_filter_id(RegionId::new(1024, 7), 1);
let filter_id = test_filter_id(1);
let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) {
EntryRegistration::Inserted(entry) => entry,
other => panic!("unexpected registration result: {other:?}"),