diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 9d3bf65da1..c7860f5706 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -602,4 +602,76 @@ mod test { assert_eq!(actual, expected.trim()); Ok(()) } + + #[test] + fn test_affected_rows_roundtrip_through_flight_codec() { + // Verify the full FlightEncoder → FlightDecoder pipeline handles + // the new FlightMessage::AffectedRows variant with optional inline + // metrics without breaking the wire protocol. + let mut encoder = FlightEncoder::default(); + let mut decoder = FlightDecoder::default(); + + // Without metrics — same wire format as old `AffectedRows(7)`. + let encoded = encoder.encode(FlightMessage::AffectedRows { + rows: 7, + metrics: None, + }); + let decoded = decoder.try_decode(encoded.first()).unwrap().unwrap(); + assert!(matches!( + decoded, + FlightMessage::AffectedRows { + rows: 7, + metrics: None, + } + )); + + // With metrics — new capability, row count preserved. + let json = r#"{"region_watermarks":[{"region_id":1,"watermark":99}]}"#; + let encoded = encoder.encode(FlightMessage::AffectedRows { + rows: 42, + metrics: Some(json.to_string()), + }); + let decoded = decoder.try_decode(encoded.first()).unwrap().unwrap(); + assert!(matches!( + decoded, + FlightMessage::AffectedRows { + rows: 42, + metrics: Some(_), + } + )); + } + + /// Simulates the wire output of the **old** `FlightMessage::AffectedRows(usize)` + /// variant and verifies that the **new** `FlightDecoder` handles it. + #[test] + fn test_old_affected_rows_format_decoded_by_new_code() { + use arrow_flight::FlightData; + use prost::bytes::Bytes as ProstBytes; + + // The old encoder produced FlightData whose app_metadata is + // FlightMetadata { affected_rows, metrics: None }. The new + // `AffectedRows { rows, metrics: Option }` variant with + // `metrics: None` produces the exact same wire bytes. + let old_wire_bytes = FlightData { + flight_descriptor: None, + data_header: build_none_flight_msg().into(), + app_metadata: FlightMetadata { + affected_rows: Some(AffectedRows { value: 99 }), + metrics: None, // old format: no metrics field + } + .encode_to_vec() + .into(), + data_body: ProstBytes::default(), + }; + + let mut decoder = FlightDecoder::default(); + let decoded = decoder.try_decode(&old_wire_bytes).unwrap().unwrap(); + assert!(matches!( + decoded, + FlightMessage::AffectedRows { + rows: 99, + metrics: None, + } + )); + } }