feat: fan out remote dynamic filter updates from FE (#8241)

* feat: fe send update

Signed-off-by: discord9 <discord9@163.com>

* refactor: after rebase

Signed-off-by: discord9 <discord9@163.com>

* fix: encode at least send bounds

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-08 16:24:57 +08:00
committed by GitHub
parent 5fd5b91b29
commit 651cf04525
4 changed files with 1269 additions and 55 deletions

View File

@@ -199,6 +199,18 @@ pub enum Error {
#[snafu(display("Invalid character in prefix config: {}", prefix))]
InvalidColumnPrefix { prefix: String },
#[snafu(display(
"DynFilterPayload::Datafusion is {} bytes, which exceeds the configured limit of {} bytes",
payload_size_bytes,
max_payload_bytes
))]
DynFilterPayloadTooLarge {
payload_size_bytes: usize,
max_payload_bytes: usize,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -229,6 +241,8 @@ impl ErrorExt for Error {
| Error::InvalidFuncArgs { .. }
| Error::InvalidColumnPrefix { .. } => StatusCode::InvalidArguments,
Error::DynFilterPayloadTooLarge { .. } => StatusCode::PlanQuery,
Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
Error::DecodePlan { source, .. }

View File

@@ -19,10 +19,10 @@ use std::sync::Arc;
use api::v1::region::RegionRequestHeader;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_expr::expressions::{Column, InListExpr, lit};
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::joins::HashTableLookupExpr;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result as DataFusionResult};
use datafusion_expr::LogicalPlan;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
@@ -31,6 +31,7 @@ use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
use datafusion_proto::protobuf::PhysicalExprNode;
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use store_api::storage::RegionId;
/// Current wire-format version for remote dynamic filter payload updates.
@@ -39,6 +40,7 @@ pub use self::initial_remote_dyn_filter_reg::{
INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, InitialDynFilterReg,
InitialDynFilterRegs, InitialDynFilterSnapshot,
};
use crate::error::{DynFilterPayloadTooLargeSnafu, Error as CommonQueryError};
pub const DYN_FILTER_PROTOCOL_VERSION: u32 = 1;
@@ -63,24 +65,23 @@ pub enum DynFilterPayload {
impl DynFilterPayload {
/// Encodes a DataFusion physical expression into a bounded dynamic filter payload.
///
/// This rejects expressions that cannot be safely shipped as dynamic filter
/// predicates and fails if the serialized payload exceeds `max_payload_bytes`.
/// Runtime-only hash lookup predicates are degraded to `true` before encoding so
/// serializable min/max bounds around them can still be shipped to remote scans.
/// If the full serializable predicate is still larger than `max_payload_bytes`, large
/// membership predicates (`IN (...)`) are also degraded to `true` as a bounds-only fallback.
pub fn from_datafusion_expr(
expr: &Arc<dyn PhysicalExpr>,
max_payload_bytes: usize,
) -> DataFusionResult<Self> {
validate_supported_payload_expr(expr)?;
let codec = DefaultPhysicalExtensionCodec {};
let proto = serialize_physical_expr(expr, &codec)?;
let mut bytes = Vec::new();
proto.encode(&mut bytes).map_err(|e| {
DataFusionError::Internal(format!("Failed to encode PhysicalExprNode: {e}"))
})?;
validate_payload_size(bytes.len(), max_payload_bytes)?;
Ok(Self::Datafusion(bytes))
match encode_remote_dyn_filter_expr(expr, max_payload_bytes, false) {
Ok(bytes) => Ok(Self::Datafusion(bytes)),
Err(CommonQueryError::DynFilterPayloadTooLarge { .. }) => {
encode_remote_dyn_filter_expr(expr, max_payload_bytes, true)
.map(Self::Datafusion)
.map_err(DataFusionError::from)
}
Err(error) => Err(DataFusionError::from(error)),
}
}
/// Decodes a DataFusion dynamic filter payload against the provided input schema.
@@ -95,7 +96,7 @@ impl DynFilterPayload {
max_payload_bytes: usize,
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
let Self::Datafusion(bytes) = self;
validate_payload_size(bytes.len(), max_payload_bytes)?;
validate_payload_size(bytes.len(), max_payload_bytes).map_err(DataFusionError::from)?;
let codec = DefaultPhysicalExtensionCodec {};
let proto = PhysicalExprNode::decode(bytes.as_slice()).map_err(|e| {
DataFusionError::Internal(format!("Failed to decode PhysicalExprNode: {e}"))
@@ -118,13 +119,41 @@ fn encode_physical_expr_to_bytes(expr: &Arc<dyn PhysicalExpr>) -> DataFusionResu
Ok(bytes)
}
fn encode_remote_dyn_filter_expr(
expr: &Arc<dyn PhysicalExpr>,
max_payload_bytes: usize,
bounds_only: bool,
) -> Result<Vec<u8>, CommonQueryError> {
let expr = portable_remote_dyn_filter_expr(Arc::clone(expr), bounds_only)
.map_err(CommonQueryError::from)?;
let bytes = encode_physical_expr_to_bytes(&expr).map_err(CommonQueryError::from)?;
validate_payload_size(bytes.len(), max_payload_bytes)?;
Ok(bytes)
}
fn portable_remote_dyn_filter_expr(
expr: Arc<dyn PhysicalExpr>,
bounds_only: bool,
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
expr.transform_up(|node| {
if node.as_any().is::<HashTableLookupExpr>()
|| (bounds_only && node.as_any().is::<InListExpr>())
{
Ok(Transformed::yes(lit(true)))
} else {
Ok(Transformed::no(node))
}
})
.map(|transformed| transformed.data)
}
pub(crate) fn decode_physical_expr_from_bytes(
bytes: &[u8],
task_ctx: &TaskContext,
input_schema: &datafusion::arrow::datatypes::Schema,
max_payload_bytes: usize,
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
validate_payload_size(bytes.len(), max_payload_bytes)?;
validate_payload_size(bytes.len(), max_payload_bytes).map_err(DataFusionError::from)?;
let codec = DefaultPhysicalExtensionCodec {};
let proto = PhysicalExprNode::decode(bytes).map_err(|e| {
DataFusionError::Internal(format!("Failed to decode PhysicalExprNode: {e}"))
@@ -139,13 +168,14 @@ pub(crate) fn decode_physical_expr_from_bytes(
fn validate_payload_size(
payload_size_bytes: usize,
max_payload_bytes: usize,
) -> DataFusionResult<()> {
if payload_size_bytes > max_payload_bytes {
return Err(DataFusionError::Plan(format!(
"DynFilterPayload::Datafusion is {} bytes, which exceeds the configured limit of {} bytes",
payload_size_bytes, max_payload_bytes
)));
}
) -> Result<(), CommonQueryError> {
ensure!(
payload_size_bytes <= max_payload_bytes,
DynFilterPayloadTooLargeSnafu {
payload_size_bytes,
max_payload_bytes,
}
);
Ok(())
}
@@ -263,7 +293,11 @@ mod tests {
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_expr::expressions::{BinaryExpr, Column, InListExpr, lit};
use datafusion::physical_plan::expressions::col;
use datafusion::physical_plan::joins::join_hash_map::JoinHashMapU32;
use datafusion::physical_plan::joins::{HashTableLookupExpr, Map, SeededRandomState};
use datafusion_expr::Operator;
use super::*;
@@ -407,12 +441,114 @@ mod tests {
assert!(matches!(err, DataFusionError::Plan(_)));
}
#[test]
fn dyn_filter_payload_hash_lookup_fallback_preserves_bounds() {
let schema = Arc::new(Schema::new(vec![Field::new(
"device_id",
DataType::Int32,
false,
)]));
let device_id = col("device_id", &schema).unwrap();
let lower_bound = Arc::new(BinaryExpr::new(
Arc::clone(&device_id),
Operator::GtEq,
lit(10i32),
)) as Arc<dyn PhysicalExpr>;
let lookup = Arc::new(HashTableLookupExpr::new(
vec![Arc::clone(&device_id)],
SeededRandomState::with_seeds(0, 0, 0, 0),
Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(0)))),
"hash_lookup".to_string(),
)) as Arc<dyn PhysicalExpr>;
let expr =
Arc::new(BinaryExpr::new(lower_bound, Operator::And, lookup)) as Arc<dyn PhysicalExpr>;
let payload = DynFilterPayload::from_datafusion_expr(&expr, 1024).unwrap();
let decoded = payload
.decode_datafusion_expr(&TaskContext::default(), &schema, 1024)
.unwrap();
assert!(!contains_expr::<HashTableLookupExpr>(&decoded));
let decoded_display = decoded.to_string();
assert!(decoded_display.contains("device_id"));
assert!(decoded_display.contains(">="));
assert!(!decoded_display.contains("hash_lookup"));
}
#[test]
fn dyn_filter_payload_oversized_inlist_falls_back_to_bounds() {
let schema = Arc::new(Schema::new(vec![Field::new(
"device_id",
DataType::Int32,
false,
)]));
let device_id = col("device_id", &schema).unwrap();
let lower_bound = Arc::new(BinaryExpr::new(
Arc::clone(&device_id),
Operator::GtEq,
lit(8192i32),
)) as Arc<dyn PhysicalExpr>;
let upper_bound = Arc::new(BinaryExpr::new(
Arc::clone(&device_id),
Operator::LtEq,
lit(8255i32),
)) as Arc<dyn PhysicalExpr>;
let bounds = Arc::new(BinaryExpr::new(lower_bound, Operator::And, upper_bound))
as Arc<dyn PhysicalExpr>;
let in_list = Arc::new(
InListExpr::try_new(
Arc::clone(&device_id),
(8192..8256).map(lit).collect(),
false,
&schema,
)
.unwrap(),
) as Arc<dyn PhysicalExpr>;
let expr = Arc::new(BinaryExpr::new(Arc::clone(&bounds), Operator::And, in_list))
as Arc<dyn PhysicalExpr>;
let bounds_only = portable_remote_dyn_filter_expr(Arc::clone(&expr), true).unwrap();
let bounds_only_size = encode_physical_expr_to_bytes(&bounds_only).unwrap().len();
let full_size = encode_physical_expr_to_bytes(&expr).unwrap().len();
assert!(full_size > bounds_only_size);
let payload = DynFilterPayload::from_datafusion_expr(&expr, bounds_only_size).unwrap();
let decoded = payload
.decode_datafusion_expr(&TaskContext::default(), &schema, bounds_only_size)
.unwrap();
assert!(!contains_expr::<InListExpr>(&decoded));
let decoded_display = decoded.to_string();
assert!(decoded_display.contains("device_id"));
assert!(decoded_display.contains(">="));
assert!(decoded_display.contains("<="));
}
#[test]
fn dyn_filter_payload_rejects_oversized_payload() {
let expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("host", 0));
let err = DynFilterPayload::from_datafusion_expr(&expr, 1).unwrap_err();
assert!(matches!(err, DataFusionError::Plan(_)));
let DataFusionError::External(error) = err else {
panic!("expected external common query error, got: {err:?}");
};
assert!(matches!(
error.downcast_ref::<CommonQueryError>(),
Some(CommonQueryError::DynFilterPayloadTooLarge { .. })
));
}
fn contains_expr<T: 'static>(expr: &Arc<dyn PhysicalExpr>) -> bool {
let mut found = false;
expr.apply(|node| {
if node.as_any().is::<T>() {
found = true;
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
})
.unwrap();
found
}
}

View File

@@ -85,6 +85,23 @@ fn acquire_remote_dyn_filter_registry_lease(
)
}
fn query_context_for_remote_dyn_filter_region(
query_ctx: &QueryContextRef,
region_id: RegionId,
remote_dyn_filter_registry_lease: Option<&RemoteDynFilterRegistryLease>,
captured_dyn_filters: &[CapturedDynFilter],
) -> session::context::QueryContext {
if let Some(remote_dyn_filter_registry_lease) = remote_dyn_filter_registry_lease {
register_dyn_filters_for_region(
remote_dyn_filter_registry_lease.registry(),
region_id,
captured_dyn_filters,
);
}
query_context_with_initial_dyn_filter_regs(query_ctx, region_id, captured_dyn_filters)
}
#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
pub struct MergeScanLogicalPlan {
/// In logical plan phase it only contains one input
@@ -346,25 +363,16 @@ impl MergeScanExec {
.step_by(target_partition)
.copied()
{
if let Some(remote_dyn_filter_registry_lease) =
remote_dyn_filter_registry_lease.as_ref()
{
register_dyn_filters_for_region(
remote_dyn_filter_registry_lease.registry(),
region_id,
&captured_remote_dyn_filters,
);
}
let region_span = tracing_context.attach(tracing::info_span!(
parent: &Span::current(),
"merge_scan_region",
region_id = %region_id,
partition = partition
));
let region_query_ctx = query_context_with_initial_dyn_filter_regs(
let region_query_ctx = query_context_for_remote_dyn_filter_region(
&query_ctx,
region_id,
remote_dyn_filter_registry_lease.as_ref(),
&captured_remote_dyn_filters,
);
let request = QueryRequest {
@@ -397,6 +405,13 @@ impl MergeScanExec {
})?;
let do_get_cost = do_get_start.elapsed();
if let Some(remote_dyn_filter_registry_lease) =
remote_dyn_filter_registry_lease.as_ref()
{
remote_dyn_filter_registry_lease
.ensure_fanout_task(region_query_handler.clone());
}
ready_timer.stop();
let mut poll_duration = Duration::ZERO;
@@ -869,6 +884,7 @@ mod tests {
use std::collections::BTreeSet;
use async_trait::async_trait;
use common_query::request::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY;
use datafusion::config::ConfigOptions;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::filter_pushdown::ChildFilterPushdownResult;
@@ -885,13 +901,54 @@ mod tests {
use uuid::Uuid;
use super::*;
use crate::dist_plan::DynFilterRegistryManager;
use crate::dist_plan::{DynFilterRegistryManager, Subscriber};
use crate::region_query::RegionQueryHandler;
fn test_query_id(value: u128) -> QueryId {
QueryId::from(Uuid::from_u128(value))
}
#[test]
fn remote_dyn_filter_region_query_context_registers_before_do_get() {
let registry_manager = Arc::new(DynFilterRegistryManager::default());
let query_ctx = QueryContext::arc();
let query_id = query_ctx
.remote_query_id_value()
.expect("query context must have remote query id");
let lease = registry_manager.acquire_lease(query_id);
let region_id = RegionId::new(1024, 7);
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
physical_lit(true) as _,
)) as Arc<dyn datafusion_physical_expr::PhysicalExpr>;
let captured = capture_remote_dyn_filters_for_pushdown(
RemoteDynFilterProducerId::new(42),
vec![dyn_filter],
);
assert_eq!(captured.captured_dyn_filters.len(), 1);
let region_query_ctx = query_context_for_remote_dyn_filter_region(
&query_ctx,
region_id,
Some(&lease),
&captured.captured_dyn_filters,
);
let entries = lease.registry().entries();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].subscribers(), vec![Subscriber::new(region_id)]);
assert!(
!entries[0].fanout_started_for_test(),
"fanout must start only after do_get succeeds"
);
assert!(
region_query_ctx
.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
.is_some(),
"initial RDF registrations must be present in the do_get query context"
);
}
#[test]
fn remote_dyn_filter_registry_cleanup_waits_for_last_query_scoped_stream_drop() {
let registry_manager = Arc::new(DynFilterRegistryManager::default());

File diff suppressed because it is too large Load Diff