From 53799c04aec0ae88ba10f86b2b23d86e110a8691 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 14 May 2026 11:21:54 +0800 Subject: [PATCH] fix: add back defensive check Signed-off-by: discord9 --- src/common/query/src/request.rs | 54 ++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/src/common/query/src/request.rs b/src/common/query/src/request.rs index 92d01cc50e..22437bde71 100644 --- a/src/common/query/src/request.rs +++ b/src/common/query/src/request.rs @@ -99,6 +99,11 @@ impl DynFilterPayload { Ok(Self::Datafusion(bytes)) } + /// Decodes a DataFusion dynamic filter payload against the provided input schema. + /// + /// The decoded expression is validated to ensure column indexes stay within the receiver-side + /// schema, column names are consistent (defensive check), and the payload stays within + /// `max_payload_bytes`. pub fn decode_datafusion_expr( &self, task_ctx: &TaskContext, @@ -148,20 +153,35 @@ fn validate_supported_payload_expr(expr: &Arc) -> DataFusionRe Ok(()) } +/// Validates decoded dynamic filter physical expressions against the receiver-side schema. +/// +/// Rejects out-of-bounds column indexes and, as a defensive check, rejects columns whose +/// name disagrees with the corresponding schema field. DataFusion physical `Column` is +/// index-authoritative, so a name mismatch usually indicates a coordinator/receiver +/// schema inconsistency that should be surfaced loudly. fn validate_decoded_payload_expr( expr: &Arc, input_schema: &Schema, ) -> DataFusionResult<()> { expr.apply(|node| { - if let Some(column) = node.as_any().downcast_ref::() - && input_schema.fields().get(column.index()).is_none() - { - return Err(DataFusionError::Plan(format!( - "Decoded Column '{}' references out-of-bounds index {} for input schema of size {}", - column.name(), - column.index(), - input_schema.fields().len() - ))); + if let Some(column) = node.as_any().downcast_ref::() { + let Some(field) = input_schema.fields().get(column.index()) else { + return Err(DataFusionError::Plan(format!( + "Decoded Column '{}' references out-of-bounds index {} for input schema of size {}", + column.name(), + column.index(), + input_schema.fields().len() + ))); + }; + + if field.name() != column.name() { + return Err(DataFusionError::Plan(format!( + "Decoded Column name/index mismatch: payload has '{}' at index {}, but schema field is '{}'", + column.name(), + column.index(), + field.name() + ))); + } } Ok(TreeNodeRecursion::Continue) @@ -344,18 +364,22 @@ mod tests { } #[test] - fn dyn_filter_payload_decode_accepts_column_name_mismatch_when_index_is_valid() { + fn dyn_filter_payload_decode_rejects_column_name_index_mismatch() { let schema = Schema::new(vec![Field::new("host", DataType::Utf8, false)]); let expr: Arc = Arc::new(Column::new("service", 0)); let payload = DynFilterPayload::from_datafusion_expr(&expr, 1024).unwrap(); - let decoded = payload + let err = payload .decode_datafusion_expr(&TaskContext::default(), &schema, 1024) - .unwrap(); + .unwrap_err(); - let decoded = decoded.as_any().downcast_ref::().unwrap(); - - assert_eq!(decoded.index(), 0); + let msg = err.to_string(); + assert!( + msg.contains("name/index mismatch"), + "expected name/index mismatch error, got: {msg}" + ); + assert!(msg.contains("service")); + assert!(msg.contains("host")); } #[test]