feat: add initial dyn filter snapshot

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-02 21:26:32 +08:00
parent f350ea8772
commit 583972b32e
4 changed files with 654 additions and 133 deletions

View File

@@ -34,8 +34,9 @@ use store_api::storage::RegionId;
/// Current wire-format version for remote dynamic filter payload updates.
pub use self::initial_remote_dyn_filter_reg::{
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg,
InitialDynFilterRegs,
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, InitialDynFilterReg,
InitialDynFilterRegs, InitialDynFilterSnapshot,
};
pub const DYN_FILTER_PROTOCOL_VERSION: u32 = 1;

View File

@@ -21,11 +21,18 @@ use datafusion::physical_plan::PhysicalExpr;
use datafusion_common::Result as DataFusionResult;
use serde::{Deserialize, Serialize};
use crate::request::{decode_physical_expr_from_bytes, encode_physical_expr_to_bytes};
use crate::request::{
DynFilterPayload, decode_physical_expr_from_bytes, encode_physical_expr_to_bytes,
};
pub const INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY: &str =
"initial_remote_dyn_filter_registrations";
pub const INITIAL_REMOTE_DYN_FILTER_REGS_MAX_COUNT: usize = 64;
/// Raw encoded registration byte budget for initial remote dynamic filter registrations.
///
/// This counts DataFusion physical expression proto bytes and optional initial snapshot payload
/// bytes before serde JSON/base64 expansion. It is a payload budget, not an exact final
/// QueryContext extension string-size limit.
pub const INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES: usize = 64 * 1024;
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
@@ -43,10 +50,10 @@ impl InitialDynFilterRegs {
self.regs.is_empty()
}
pub fn total_encoded_child_expr_bytes(&self) -> usize {
pub fn total_encoded_registration_bytes(&self) -> usize {
self.regs
.iter()
.map(InitialDynFilterReg::encoded_child_expr_bytes)
.map(InitialDynFilterReg::encoded_registration_bytes)
.sum()
}
@@ -70,11 +77,11 @@ impl InitialDynFilterRegs {
));
}
let total_proto_bytes = self.total_encoded_child_expr_bytes();
if total_proto_bytes > max_total_proto_bytes {
let total_registration_bytes = self.total_encoded_registration_bytes();
if total_registration_bytes > max_total_proto_bytes {
return Err(format!(
"InitialDynFilterRegs contains {} total child expr proto bytes, which exceeds the configured limit of {}",
total_proto_bytes, max_total_proto_bytes
"InitialDynFilterRegs contains {} total encoded registration bytes, which exceeds the configured limit of {}",
total_registration_bytes, max_total_proto_bytes
));
}
@@ -104,6 +111,26 @@ impl InitialDynFilterRegs {
pub struct InitialDynFilterReg {
pub filter_id: String,
pub child_exprs_datafusion_proto: Vec<Vec<u8>>,
/// Optional producer-side predicate snapshot captured at initial registration time.
///
/// It is skipped when absent so older/no-snapshot registrations remain wire-compatible.
/// Datanode runtime code should treat this as an update that arrived before the runtime
/// `DynamicFilterPhysicalExpr` was created, not as registration identity metadata.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub initial_snapshot: Option<InitialDynFilterSnapshot>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct InitialDynFilterSnapshot {
pub payload: DynFilterPayload,
/// Producer-side snapshot generation. Receivers use this with normal update generation
/// ordering to ignore stale payloads and apply newer payloads.
pub generation: u64,
/// Whether this snapshot completes the dynamic filter stream.
///
/// 3a currently sets this conservatively to false because DataFusion exposes synchronous
/// generation/current reads but not a synchronous public completion getter.
pub is_complete: bool,
}
impl InitialDynFilterReg {
@@ -111,9 +138,15 @@ impl InitialDynFilterReg {
Self {
filter_id: filter_id.into(),
child_exprs_datafusion_proto,
initial_snapshot: None,
}
}
pub fn with_initial_snapshot(mut self, initial_snapshot: InitialDynFilterSnapshot) -> Self {
self.initial_snapshot = Some(initial_snapshot);
self
}
pub fn from_filter_id_and_children(
filter_id: impl Into<String>,
children: &[Arc<dyn PhysicalExpr>],
@@ -130,6 +163,15 @@ impl InitialDynFilterReg {
self.child_exprs_datafusion_proto.iter().map(Vec::len).sum()
}
pub fn encoded_registration_bytes(&self) -> usize {
self.encoded_child_expr_bytes()
+ self
.initial_snapshot
.as_ref()
.map(InitialDynFilterSnapshot::encoded_payload_bytes)
.unwrap_or(0)
}
pub fn decode_children(
&self,
task_ctx: &TaskContext,
@@ -150,6 +192,22 @@ impl InitialDynFilterReg {
}
}
impl InitialDynFilterSnapshot {
pub fn new(payload: DynFilterPayload, generation: u64, is_complete: bool) -> Self {
Self {
payload,
generation,
is_complete,
}
}
pub fn encoded_payload_bytes(&self) -> usize {
match &self.payload {
DynFilterPayload::Datafusion(bytes) => bytes.len(),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -174,6 +232,59 @@ mod tests {
assert_eq!(decoded, regs);
}
#[test]
fn initial_dyn_filter_regs_json_round_trip_with_snapshot() {
let regs = InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-a", vec![vec![1, 2, 3]]).with_initial_snapshot(
InitialDynFilterSnapshot::new(DynFilterPayload::Datafusion(vec![4, 5, 6]), 7, true),
),
]);
let encoded = regs.to_extension_value().unwrap();
let decoded = InitialDynFilterRegs::from_extension_value(&encoded).unwrap();
assert_eq!(decoded, regs);
assert_eq!(
decoded.regs[0]
.initial_snapshot
.as_ref()
.unwrap()
.generation,
7
);
assert!(
decoded.regs[0]
.initial_snapshot
.as_ref()
.unwrap()
.is_complete
);
}
#[test]
fn initial_dyn_filter_reg_json_defaults_missing_snapshot_to_none() {
let decoded = InitialDynFilterRegs::from_extension_value(
r#"{"registrations":[{"filter_id":"filter-a","child_exprs_datafusion_proto":[[1,2,3]]}]}"#,
)
.unwrap();
assert_eq!(decoded.regs.len(), 1);
assert!(decoded.regs[0].initial_snapshot.is_none());
}
#[test]
fn initial_dyn_filter_reg_encoded_registration_bytes_include_snapshot_payload() {
let reg = InitialDynFilterReg::new("filter-a", vec![vec![1, 2, 3], vec![4]])
.with_initial_snapshot(InitialDynFilterSnapshot::new(
DynFilterPayload::Datafusion(vec![5, 6]),
2,
false,
));
assert_eq!(reg.encoded_child_expr_bytes(), 4);
assert_eq!(reg.encoded_registration_bytes(), 6);
}
#[test]
fn initial_dyn_filter_regs_validate_bounds_rejects_duplicate_filter_ids() {
let regs = InitialDynFilterRegs::new(vec![
@@ -207,7 +318,24 @@ mod tests {
let err = regs.validate_bounds(8, 5).unwrap_err();
assert!(err.contains("6 total child expr proto bytes"));
assert!(err.contains("6 total encoded registration bytes"));
}
#[test]
fn initial_dyn_filter_regs_validate_bounds_rejects_snapshot_bytes_over_limit() {
let regs = InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-a", vec![vec![1]]).with_initial_snapshot(
InitialDynFilterSnapshot::new(
DynFilterPayload::Datafusion(vec![2, 3, 4]),
2,
false,
),
),
]);
let err = regs.validate_bounds(8, 3).unwrap_err();
assert!(err.contains("4 total encoded registration bytes"));
}
#[test]

View File

@@ -16,8 +16,9 @@ use std::any::Any;
use std::sync::Arc;
use common_query::request::{
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg,
InitialDynFilterRegs,
DynFilterPayload, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, InitialDynFilterReg,
InitialDynFilterRegs, InitialDynFilterSnapshot,
};
use datafusion::execution::TaskContext;
use datafusion_common::Result;
@@ -32,33 +33,69 @@ use crate::query_engine::QueryEngineState;
#[derive(Debug, Clone)]
pub(crate) struct CapturedDynFilter {
pub(crate) producer_scope_id: ProducerScopeId,
pub(crate) producer_local_ordinal: usize,
filter_id: FilterId,
initial_registration: InitialDynFilterReg,
pub(crate) alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
}
#[derive(Debug, Clone)]
struct ResolvedDynFilter {
filter_id: FilterId,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
children: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
pub(crate) struct RemoteDynFilterPushdown {
pub(crate) captured_dyn_filters: Vec<CapturedDynFilter>,
/// Per-parent-filter preflight readiness. `true` means the initial registration can be
/// constructed and serialized; the caller may still choose to return `PushedDown::No` until
/// datanode-side consumption is available.
pub(crate) pushed_down: Vec<bool>,
}
pub(crate) fn capture_remote_dyn_filters(
pub(crate) fn capture_remote_dyn_filters_for_pushdown(
producer_scope_id: ProducerScopeId,
parent_filters: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
) -> Vec<CapturedDynFilter> {
parent_filters
.into_iter()
.enumerate()
.filter_map(|(producer_local_ordinal, filter)| {
downcast_dynamic_filter(filter).map(|alive_dyn_filter| CapturedDynFilter {
producer_scope_id,
producer_local_ordinal,
alive_dyn_filter,
})
})
.collect()
) -> RemoteDynFilterPushdown {
let mut pushed_down = Vec::with_capacity(parent_filters.len());
let mut captured_dyn_filters = Vec::new();
for (producer_local_ordinal, filter) in parent_filters.into_iter().enumerate() {
let Some(alive_dyn_filter) = downcast_dynamic_filter(filter) else {
pushed_down.push(false);
continue;
};
match build_captured_dyn_filter(producer_scope_id, producer_local_ordinal, alive_dyn_filter)
{
Ok(captured_dyn_filter) => {
pushed_down.push(true);
captured_dyn_filters.push(captured_dyn_filter);
}
Err(error) => {
common_telemetry::warn!(error; "Remote dyn filter is not pushed down because initial registration cannot be built");
pushed_down.push(false);
}
}
}
// Initial snapshots are an optional warm-start optimization. If validation fails while any
// snapshot is attached, retry once without snapshots before rejecting base registrations. This
// broad fallback is intentional: snapshot bytes are the only optional part here, and the second
// validation below still rejects non-snapshot errors such as duplicate ids or count limits.
if has_initial_snapshots(&captured_dyn_filters)
&& let Err(error) = validate_initial_registrations_for_pushdown(&captured_dyn_filters)
{
common_telemetry::warn!(error; "Initial remote dyn filter registrations failed validation with optional snapshots; retrying without snapshots");
drop_initial_snapshots(&mut captured_dyn_filters);
}
if let Err(error) = validate_initial_registrations_for_pushdown(&captured_dyn_filters) {
common_telemetry::warn!(error; "Remote dyn filters are not pushed down because initial registrations are invalid");
return RemoteDynFilterPushdown {
captured_dyn_filters: Vec::new(),
pushed_down: vec![false; pushed_down.len()],
};
}
RemoteDynFilterPushdown {
captured_dyn_filters,
pushed_down,
}
}
fn downcast_dynamic_filter(
@@ -78,13 +115,13 @@ pub(crate) fn register_dyn_filters_for_region(
region_id: RegionId,
captured_dyn_filters: &[CapturedDynFilter],
) {
for resolved_filter in resolved_dyn_filters(captured_dyn_filters) {
for captured_dyn_filter in captured_dyn_filters {
let _ = registry.register_remote_dyn_filter(
resolved_filter.filter_id.clone(),
resolved_filter.alive_dyn_filter,
captured_dyn_filter.filter_id.clone(),
captured_dyn_filter.alive_dyn_filter.clone(),
);
let _ =
registry.register_subscriber(&resolved_filter.filter_id, Subscriber::new(region_id));
let _ = registry
.register_subscriber(&captured_dyn_filter.filter_id, Subscriber::new(region_id));
}
}
@@ -109,51 +146,117 @@ pub(crate) fn bridge_dyn_filters_for_region(
register_dyn_filters_for_region(&registry, region_id, captured_dyn_filters);
}
fn resolve_dyn_filter(captured_dyn_filter: &CapturedDynFilter) -> Result<ResolvedDynFilter> {
let children = captured_dyn_filter
.alive_dyn_filter
fn build_captured_dyn_filter(
producer_scope_id: ProducerScopeId,
producer_local_ordinal: usize,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
) -> Result<CapturedDynFilter> {
let children = alive_dyn_filter
.children()
.into_iter()
.cloned()
.collect::<Vec<_>>();
let filter_id = build_remote_dyn_filter_id(
captured_dyn_filter.producer_scope_id,
captured_dyn_filter.producer_local_ordinal,
&children,
)?;
let filter_id =
build_remote_dyn_filter_id(producer_scope_id, producer_local_ordinal, &children)?;
let initial_registration =
InitialDynFilterReg::from_filter_id_and_children(filter_id.to_string(), &children)?;
Ok(ResolvedDynFilter {
Ok(CapturedDynFilter {
filter_id,
alive_dyn_filter: captured_dyn_filter.alive_dyn_filter.clone(),
children,
initial_registration: attach_stable_initial_snapshot(
initial_registration,
&alive_dyn_filter,
),
alive_dyn_filter,
})
}
fn resolved_dyn_filters(captured_dyn_filters: &[CapturedDynFilter]) -> Vec<ResolvedDynFilter> {
fn validate_initial_registrations_for_pushdown(
captured_dyn_filters: &[CapturedDynFilter],
) -> std::result::Result<(), String> {
let regs = build_initial_dyn_filter_regs_for_region(captured_dyn_filters);
regs.validate_default_bounds()?;
regs.to_extension_value()
.map_err(|error| error.to_string())?;
Ok(())
}
fn drop_initial_snapshots(captured_dyn_filters: &mut [CapturedDynFilter]) {
for captured in captured_dyn_filters {
captured.initial_registration.initial_snapshot = None;
}
}
fn has_initial_snapshots(captured_dyn_filters: &[CapturedDynFilter]) -> bool {
captured_dyn_filters
.iter()
.filter_map(|captured_dyn_filter| resolve_dyn_filter(captured_dyn_filter).ok())
.collect()
.any(|captured| captured.initial_registration.initial_snapshot.is_some())
}
fn attach_stable_initial_snapshot(
initial_registration: InitialDynFilterReg,
alive_dyn_filter: &DynamicFilterPhysicalExpr,
) -> InitialDynFilterReg {
let Some(initial_snapshot) = stable_initial_snapshot(alive_dyn_filter) else {
return initial_registration;
};
initial_registration.with_initial_snapshot(initial_snapshot)
}
fn stable_initial_snapshot(
alive_dyn_filter: &DynamicFilterPhysicalExpr,
) -> Option<InitialDynFilterSnapshot> {
// DataFusion dynamic filters start at generation 1 with a neutral predicate such as `true`.
// Sending that initial value does not improve remote filtering, so only send snapshots after
// a real producer update has advanced the generation.
let generation_before = alive_dyn_filter.snapshot_generation();
if generation_before <= 1 {
return None;
}
// Read a stable snapshot by checking the generation before and after `current()`. This relies
// on DataFusion's generation being monotonic for each update; if it changes while reading,
// omit the optional snapshot and let later async updates carry the new predicate.
let current = match alive_dyn_filter.current() {
Ok(current) => current,
Err(error) => {
common_telemetry::warn!(error; "Failed to read remote dyn filter initial snapshot");
return None;
}
};
let generation_after = alive_dyn_filter.snapshot_generation();
if generation_before != generation_after {
return None;
}
let payload = match DynFilterPayload::from_datafusion_expr(
&current,
INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES,
) {
Ok(payload) => payload,
Err(error) => {
common_telemetry::warn!(error; "Failed to encode remote dyn filter initial snapshot");
return None;
}
};
Some(InitialDynFilterSnapshot::new(
payload,
generation_after,
// DataFusion does not expose a synchronous public completion getter. Treat the initial
// snapshot as a non-terminal pending update; later runtime/update handling owns completion.
false,
))
}
fn build_initial_dyn_filter_regs_for_region(
captured_dyn_filters: &[CapturedDynFilter],
) -> InitialDynFilterRegs {
InitialDynFilterRegs::new(
resolved_dyn_filters(captured_dyn_filters)
.into_iter()
.filter_map(|resolved_filter| {
match InitialDynFilterReg::from_filter_id_and_children(
resolved_filter.filter_id.to_string(),
&resolved_filter.children,
) {
Ok(reg) => Some(reg),
Err(error) => {
common_telemetry::warn!(error; "Failed to encode initial remote dyn filter registration");
None
}
}
})
captured_dyn_filters
.iter()
.map(|captured| captured.initial_registration.clone())
.collect(),
)
}
@@ -189,12 +292,82 @@ pub(crate) fn query_context_with_initial_dyn_filter_regs(
#[cfg(test)]
mod tests {
use std::fmt;
use std::hash::{Hash, Hasher};
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::expressions::{Column, lit};
use session::query_id::QueryId;
use uuid::Uuid;
use super::*;
#[derive(Debug)]
struct UnserializableExpr;
impl fmt::Display for UnserializableExpr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "unserializable_expr")
}
}
impl Hash for UnserializableExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
"unserializable_expr".hash(state);
}
}
impl PartialEq for UnserializableExpr {
fn eq(&self, _other: &Self) -> bool {
true
}
}
impl Eq for UnserializableExpr {}
impl datafusion_physical_expr::PhysicalExpr for UnserializableExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(
&self,
_input_schema: &arrow_schema::Schema,
) -> datafusion_common::Result<arrow_schema::DataType> {
Ok(arrow_schema::DataType::Boolean)
}
fn nullable(
&self,
_input_schema: &arrow_schema::Schema,
) -> datafusion_common::Result<bool> {
Ok(false)
}
fn evaluate(
&self,
_batch: &common_recordbatch::DfRecordBatch,
) -> datafusion_common::Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))))
}
fn children(&self) -> Vec<&Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
Vec::new()
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn datafusion_physical_expr::PhysicalExpr>>,
) -> datafusion_common::Result<Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
Ok(self)
}
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self}")
}
}
fn test_query_id(value: u128) -> QueryId {
QueryId::from(Uuid::from_u128(value))
}
@@ -203,8 +376,40 @@ mod tests {
ProducerScopeId::new(value)
}
fn test_captured_dyn_filter(
producer_scope_id: ProducerScopeId,
producer_local_ordinal: usize,
column_name: &str,
column_index: usize,
) -> CapturedDynFilter {
build_captured_dyn_filter(
producer_scope_id,
producer_local_ordinal,
Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>],
lit(true) as _,
)),
)
.unwrap()
}
fn test_dyn_filter_with_snapshot_payload(
column_name: &str,
column_index: usize,
payload_bytes: usize,
) -> Arc<DynamicFilterPhysicalExpr> {
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>],
lit(true) as _,
));
dyn_filter
.update(lit(ScalarValue::Utf8(Some("x".repeat(payload_bytes)))) as _)
.unwrap();
dyn_filter
}
#[test]
fn capture_remote_dyn_filters_preserves_parent_filter_ordinals() {
fn capture_remote_dyn_filters_for_pushdown_preserves_parent_filter_ordinals() {
let parent_filters = vec![
Arc::new(Column::new("service", 0)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
Arc::new(DynamicFilterPhysicalExpr::new(
@@ -219,26 +424,182 @@ mod tests {
];
let producer_scope_id = test_producer_scope(42);
let captured = capture_remote_dyn_filters(producer_scope_id, parent_filters);
let captured = capture_remote_dyn_filters_for_pushdown(producer_scope_id, parent_filters)
.captured_dyn_filters;
assert_eq!(captured.len(), 2);
assert_eq!(captured[0].producer_scope_id, producer_scope_id);
assert_eq!(captured[1].producer_scope_id, producer_scope_id);
assert_eq!(captured[0].producer_local_ordinal, 1);
assert_eq!(captured[1].producer_local_ordinal, 3);
assert_eq!(captured[0].filter_id.producer_scope_id(), producer_scope_id);
assert_eq!(captured[1].filter_id.producer_scope_id(), producer_scope_id);
assert_eq!(captured[0].filter_id.producer_ordinal(), 1);
assert_eq!(captured[1].filter_id.producer_ordinal(), 3);
}
#[test]
fn capture_remote_dyn_filters_for_pushdown_marks_only_valid_initial_regs() {
let parent_filters = vec![
Arc::new(Column::new("service", 0)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 1)) as Arc<_>],
lit(true) as _,
)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
Arc::new(Column::new("zone", 2)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
];
let producer_scope_id = test_producer_scope(42);
let pushdown = capture_remote_dyn_filters_for_pushdown(producer_scope_id, parent_filters);
assert_eq!(pushdown.pushed_down, vec![false, true, false]);
assert_eq!(pushdown.captured_dyn_filters.len(), 1);
assert_eq!(
pushdown.captured_dyn_filters[0]
.filter_id
.producer_scope_id(),
producer_scope_id
);
assert_eq!(
pushdown.captured_dyn_filters[0]
.filter_id
.producer_ordinal(),
1
);
assert!(
pushdown.captured_dyn_filters[0]
.initial_registration
.initial_snapshot
.is_none()
);
}
#[test]
fn capture_remote_dyn_filters_for_pushdown_rejects_unencodable_registration() {
let parent_filters = vec![Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(UnserializableExpr) as Arc<_>],
lit(true) as _,
))
as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
assert_eq!(pushdown.pushed_down, vec![false]);
assert!(pushdown.captured_dyn_filters.is_empty());
}
#[test]
fn capture_remote_dyn_filters_for_pushdown_omits_neutral_initial_snapshot() {
let parent_filters = vec![Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 1)) as Arc<_>],
lit(true) as _,
))
as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
assert_eq!(pushdown.pushed_down, vec![true]);
assert!(
pushdown.captured_dyn_filters[0]
.initial_registration
.initial_snapshot
.is_none()
);
}
#[test]
fn capture_remote_dyn_filters_for_pushdown_attaches_stable_initial_snapshot() {
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 1)) as Arc<_>],
lit(true) as _,
));
dyn_filter.update(lit(false) as _).unwrap();
let parent_filters = vec![dyn_filter as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
assert_eq!(pushdown.pushed_down, vec![true]);
let snapshot = pushdown.captured_dyn_filters[0]
.initial_registration
.initial_snapshot
.as_ref()
.unwrap();
assert_eq!(snapshot.generation, 2);
assert!(!snapshot.is_complete);
assert!(matches!(
snapshot.payload,
DynFilterPayload::Datafusion(ref bytes) if !bytes.is_empty()
));
}
#[test]
fn capture_remote_dyn_filters_for_pushdown_drops_oversized_snapshots_and_keeps_regs() {
let parent_filters = vec![
test_dyn_filter_with_snapshot_payload("host", 0, 40 * 1024)
as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
test_dyn_filter_with_snapshot_payload("pod", 1, 40 * 1024)
as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
];
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
assert_eq!(pushdown.pushed_down, vec![true, true]);
assert_eq!(pushdown.captured_dyn_filters.len(), 2);
assert!(
pushdown
.captured_dyn_filters
.iter()
.all(|captured| { captured.initial_registration.initial_snapshot.is_none() })
);
}
#[test]
fn capture_remote_dyn_filters_for_pushdown_rejects_regs_still_invalid_after_snapshot_drop() {
const TOO_MANY_INITIAL_REGS: usize = 65;
let parent_filters = (0..TOO_MANY_INITIAL_REGS)
.map(|ordinal| {
test_dyn_filter_with_snapshot_payload(&format!("host_{ordinal}"), ordinal, 1)
as Arc<dyn datafusion::physical_plan::PhysicalExpr>
})
.collect::<Vec<_>>();
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
assert!(pushdown.captured_dyn_filters.is_empty());
assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]);
}
#[test]
fn capture_remote_dyn_filters_for_pushdown_rejects_regs_exceeding_bounds() {
const TOO_MANY_INITIAL_REGS: usize = 65;
let parent_filters = (0..TOO_MANY_INITIAL_REGS)
.map(|_| {
Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>
})
.collect::<Vec<_>>();
let pushdown =
capture_remote_dyn_filters_for_pushdown(test_producer_scope(42), parent_filters);
assert!(pushdown.captured_dyn_filters.is_empty());
assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]);
}
#[test]
fn register_dyn_filters_for_region_reuses_existing_entry() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
let captured_dyn_filters = vec![CapturedDynFilter {
producer_scope_id: test_producer_scope(42),
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)),
}];
let captured_dyn_filters = vec![test_captured_dyn_filter(
test_producer_scope(42),
2,
"host",
0,
)];
let first_region_id = RegionId::new(1024, 7);
let second_region_id = RegionId::new(1024, 8);
@@ -270,14 +631,8 @@ mod tests {
fn register_dyn_filters_for_region_keeps_independent_producer_scopes_distinct() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
let region_id = RegionId::new(1024, 7);
let make_filter = |producer_scope_id| CapturedDynFilter {
producer_scope_id,
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)),
};
let make_filter =
|producer_scope_id| test_captured_dyn_filter(producer_scope_id, 2, "host", 0);
register_dyn_filters_for_region(
&registry,
@@ -295,14 +650,12 @@ mod tests {
#[test]
fn query_context_includes_region_initial_dyn_filter_regs() {
let captured_dyn_filters = vec![CapturedDynFilter {
producer_scope_id: test_producer_scope(42),
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)),
}];
let captured_dyn_filters = vec![test_captured_dyn_filter(
test_producer_scope(42),
2,
"host",
0,
)];
let region_id = RegionId::new(1024, 7);
let query_ctx = QueryContext::arc();
@@ -326,15 +679,11 @@ mod tests {
1024,
)
.unwrap();
let expected_filter_id = build_remote_dyn_filter_id(
captured_dyn_filters[0].producer_scope_id,
captured_dyn_filters[0].producer_local_ordinal,
&[Arc::new(Column::new("host", 0)) as Arc<_>],
)
.unwrap();
assert_eq!(regs.regs.len(), 1);
assert_eq!(regs.regs[0].filter_id, expected_filter_id.to_string());
assert_eq!(
regs.regs[0].filter_id,
captured_dyn_filters[0].filter_id.to_string()
);
assert_eq!(decoded_children.len(), 1);
assert!(decoded_children[0].as_any().is::<Column>());
}
@@ -342,22 +691,8 @@ mod tests {
#[test]
fn query_context_drops_initial_regs_when_duplicate_filter_ids_exceed_bounds() {
let captured_dyn_filters = vec![
CapturedDynFilter {
producer_scope_id: test_producer_scope(42),
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)),
},
CapturedDynFilter {
producer_scope_id: test_producer_scope(42),
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)),
},
test_captured_dyn_filter(test_producer_scope(42), 2, "host", 0),
test_captured_dyn_filter(test_producer_scope(42), 2, "host", 0),
];
let region_id = RegionId::new(1024, 7);
let query_ctx = QueryContext::arc();

View File

@@ -39,7 +39,7 @@ use datafusion::physical_plan::{
};
use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
@@ -54,7 +54,7 @@ use tracing::{Instrument, Span};
use crate::dist_plan::analyzer::AliasMapping;
use crate::dist_plan::analyzer::utils::patch_batch_timezone;
use crate::dist_plan::dyn_filter_bridge::{
CapturedDynFilter, bridge_dyn_filters_for_region, capture_remote_dyn_filters,
CapturedDynFilter, bridge_dyn_filters_for_region, capture_remote_dyn_filters_for_pushdown,
query_context_with_initial_dyn_filter_regs,
};
use crate::dist_plan::{ProducerScopeId, RemoteDynFilterRegistryLease};
@@ -715,23 +715,25 @@ impl ExecutionPlan for MergeScanExec {
.into_iter()
.map(|filter| filter.filter)
.collect::<Vec<_>>();
let supported = parent_filters
.iter()
.map(|filter| filter.as_any().is::<DynamicFilterPhysicalExpr>())
.collect::<Vec<_>>();
let remote_dyn_filter_pushdown = capture_remote_dyn_filters_for_pushdown(
self.remote_dyn_filter_producer_scope_id,
parent_filters,
);
*self.captured_remote_dyn_filters.lock().unwrap() =
capture_remote_dyn_filters(self.remote_dyn_filter_producer_scope_id, parent_filters);
remote_dyn_filter_pushdown.captured_dyn_filters;
let new_self = Arc::new(self.clone());
Ok(FilterPushdownPropagation {
filters: supported
filters: remote_dyn_filter_pushdown
.pushed_down
.into_iter()
.map(|supported| {
if supported {
PushedDown::Yes
} else {
PushedDown::No
}
.map(|_pushdown_ready| {
// TODO(remote-dyn-filter): Return `PushedDown::Yes` for `_pushdown_ready`
// filters after datanode-side initial registration consumption and runtime
// pending-update application are implemented. Until then, keep the parent-side
// filter as a correctness fallback because the remote side may ignore the
// registration carried in QueryContext.
PushedDown::No
})
.collect(),
updated_node: Some(new_self),
@@ -851,11 +853,15 @@ mod tests {
use std::collections::BTreeSet;
use async_trait::async_trait;
use datafusion::config::ConfigOptions;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::filter_pushdown::ChildFilterPushdownResult;
use datafusion_common::TableReference;
use datafusion_expr::{LogicalPlanBuilder, lit};
use datafusion_physical_expr::Distribution;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::{
Column, DynamicFilterPhysicalExpr, lit as physical_lit,
};
use session::ReadPreference;
use session::context::QueryContext;
use session::query_id::QueryId;
@@ -997,4 +1003,55 @@ mod tests {
"try_with_new_distribution must preserve producer scope id"
);
}
#[test]
fn remote_dyn_filter_preflight_keeps_parent_filter_until_dn_runtime_is_ready() {
let producer_scope_id = ProducerScopeId::new(42);
let plan = LogicalPlanBuilder::empty(true)
.project(vec![lit(1i32).alias("col1")])
.unwrap()
.build()
.unwrap();
let schema = plan.schema().as_arrow().clone();
let table = TableName::new("catalog", "schema", "table");
let regions = vec![RegionId::new(1024, 1)];
let query_ctx = QueryContext::arc();
let session_state = SessionStateBuilder::new().build();
let handler = Arc::new(TestRegionQueryHandler);
let exec = MergeScanExec::new(
&session_state,
table,
regions,
plan,
&schema,
handler,
query_ctx,
1,
AliasMapping::new(),
producer_scope_id,
)
.unwrap();
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
physical_lit(true) as _,
)) as Arc<dyn datafusion_physical_expr::PhysicalExpr>;
let propagation = exec
.handle_child_pushdown_result(
FilterPushdownPhase::Post,
ChildPushdownResult {
parent_filters: vec![ChildFilterPushdownResult {
filter: dyn_filter,
child_results: vec![PushedDown::Yes],
}],
self_filters: Vec::new(),
},
&ConfigOptions::new(),
)
.unwrap();
assert_eq!(exec.captured_remote_dyn_filters().len(), 1);
assert!(matches!(propagation.filters.as_slice(), [PushedDown::No]));
}
}