fix: add back defensive check

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-05-14 11:21:54 +08:00
parent 519941aa05
commit 53799c04ae

View File

@@ -99,6 +99,11 @@ impl DynFilterPayload {
Ok(Self::Datafusion(bytes))
}
/// Decodes a DataFusion dynamic filter payload against the provided input schema.
///
/// The decoded expression is validated to ensure column indexes stay within the receiver-side
/// schema, column names are consistent (defensive check), and the payload stays within
/// `max_payload_bytes`.
pub fn decode_datafusion_expr(
&self,
task_ctx: &TaskContext,
@@ -148,20 +153,35 @@ fn validate_supported_payload_expr(expr: &Arc<dyn PhysicalExpr>) -> DataFusionRe
Ok(())
}
/// Validates decoded dynamic filter physical expressions against the receiver-side schema.
///
/// Rejects out-of-bounds column indexes and, as a defensive check, rejects columns whose
/// name disagrees with the corresponding schema field. DataFusion physical `Column` is
/// index-authoritative, so a name mismatch usually indicates a coordinator/receiver
/// schema inconsistency that should be surfaced loudly.
fn validate_decoded_payload_expr(
expr: &Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> DataFusionResult<()> {
expr.apply(|node| {
if let Some(column) = node.as_any().downcast_ref::<Column>()
&& input_schema.fields().get(column.index()).is_none()
{
return Err(DataFusionError::Plan(format!(
"Decoded Column '{}' references out-of-bounds index {} for input schema of size {}",
column.name(),
column.index(),
input_schema.fields().len()
)));
if let Some(column) = node.as_any().downcast_ref::<Column>() {
let Some(field) = input_schema.fields().get(column.index()) else {
return Err(DataFusionError::Plan(format!(
"Decoded Column '{}' references out-of-bounds index {} for input schema of size {}",
column.name(),
column.index(),
input_schema.fields().len()
)));
};
if field.name() != column.name() {
return Err(DataFusionError::Plan(format!(
"Decoded Column name/index mismatch: payload has '{}' at index {}, but schema field is '{}'",
column.name(),
column.index(),
field.name()
)));
}
}
Ok(TreeNodeRecursion::Continue)
@@ -344,18 +364,22 @@ mod tests {
}
#[test]
fn dyn_filter_payload_decode_accepts_column_name_mismatch_when_index_is_valid() {
fn dyn_filter_payload_decode_rejects_column_name_index_mismatch() {
let schema = Schema::new(vec![Field::new("host", DataType::Utf8, false)]);
let expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("service", 0));
let payload = DynFilterPayload::from_datafusion_expr(&expr, 1024).unwrap();
let decoded = payload
let err = payload
.decode_datafusion_expr(&TaskContext::default(), &schema, 1024)
.unwrap();
.unwrap_err();
let decoded = decoded.as_any().downcast_ref::<Column>().unwrap();
assert_eq!(decoded.index(), 0);
let msg = err.to_string();
assert!(
msg.contains("name/index mismatch"),
"expected name/index mismatch error, got: {msg}"
);
assert!(msg.contains("service"));
assert!(msg.contains("host"));
}
#[test]