diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index 0b553c3c9c..5413dcf379 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -298,5 +298,5 @@ pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result Result { - yaml_parse_string(v, field) + STRING_OR_HASH_FN(field, v) } diff --git a/src/pipeline/tests/date.rs b/src/pipeline/tests/date.rs index 775f0688c1..fc9e726b61 100644 --- a/src/pipeline/tests/date.rs +++ b/src/pipeline/tests/date.rs @@ -136,3 +136,89 @@ transform: Some(ValueData::TimestampNanosecondValue(1719440016991000000)) ); } + +#[test] +fn test_rename_with_fields() { + let pipeline_yaml = r#" +processors: +- date: + fields: + - key: input_str + rename_to: ts + formats: + - "%Y-%m-%dT%H:%M:%S%.3fZ" + +transform: +- fields: + - ts + type: time +"#; + + let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE); +} + +#[test] +fn test_rename_with_field() { + let pipeline_yaml = r#" +processors: +- date: + field: + key: input_str + rename_to: ts + formats: + - "%Y-%m-%dT%H:%M:%S%.3fZ" + +transform: +- fields: + - ts + type: time +"#; + + let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE); +} + +#[test] +fn test_rename_with_transform_fields() { + let pipeline_yaml = r#" +processors: +- date: + field: input_str + formats: + - "%Y-%m-%dT%H:%M:%S%.3fZ" + +transform: +- fields: + - key: input_str + rename_to: ts + type: time +"#; + + let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE); +} + +#[test] +fn test_rename_with_transform_field() { + let pipeline_yaml = r#" +processors: +- date: + field: input_str + formats: + - "%Y-%m-%dT%H:%M:%S%.3fZ" + +transform: +- field: + key: input_str + rename_to: ts + type: time +"#; + + let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE); +}