mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 00:19:58 +00:00
fix: pipeline prepare loop break detects a conditional error (#4593)
This commit is contained in:
@@ -284,7 +284,7 @@ where
|
||||
let mut search_from = 0;
|
||||
// because of the key in the json map is ordered
|
||||
for (payload_key, payload_value) in map.into_iter() {
|
||||
if search_from >= self.required_keys.len() - 1 {
|
||||
if search_from >= self.required_keys.len() {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -359,15 +359,16 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_pipeline_prepare() {
|
||||
let input_value_str = r#"
|
||||
{
|
||||
let input_value_str = r#"
|
||||
{
|
||||
"my_field": "1,2",
|
||||
"foo": "bar"
|
||||
}
|
||||
"#;
|
||||
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
|
||||
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
let pipeline_yaml = r#"
|
||||
---
|
||||
description: Pipeline for Apache Tomcat
|
||||
|
||||
@@ -381,32 +382,73 @@ transform:
|
||||
- field: field2
|
||||
type: uint32
|
||||
"#;
|
||||
let pipeline: Pipeline<GreptimeTransformer> =
|
||||
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
|
||||
let mut payload = pipeline.init_intermediate_state();
|
||||
pipeline.prepare(input_value, &mut payload).unwrap();
|
||||
assert_eq!(
|
||||
&["greptime_timestamp", "my_field"].to_vec(),
|
||||
pipeline.required_keys()
|
||||
);
|
||||
assert_eq!(
|
||||
payload,
|
||||
vec![
|
||||
Value::Null,
|
||||
Value::String("1,2".to_string()),
|
||||
Value::Null,
|
||||
Value::Null
|
||||
]
|
||||
);
|
||||
let result = pipeline.exec_mut(&mut payload).unwrap();
|
||||
let pipeline: Pipeline<GreptimeTransformer> =
|
||||
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
|
||||
let mut payload = pipeline.init_intermediate_state();
|
||||
pipeline.prepare(input_value, &mut payload).unwrap();
|
||||
assert_eq!(
|
||||
&["greptime_timestamp", "my_field"].to_vec(),
|
||||
pipeline.required_keys()
|
||||
);
|
||||
assert_eq!(
|
||||
payload,
|
||||
vec![
|
||||
Value::Null,
|
||||
Value::String("1,2".to_string()),
|
||||
Value::Null,
|
||||
Value::Null
|
||||
]
|
||||
);
|
||||
let result = pipeline.exec_mut(&mut payload).unwrap();
|
||||
|
||||
assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1)));
|
||||
assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2)));
|
||||
match &result.values[2].value_data {
|
||||
Some(ValueData::TimestampNanosecondValue(v)) => {
|
||||
assert_ne!(*v, 0);
|
||||
assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1)));
|
||||
assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2)));
|
||||
match &result.values[2].value_data {
|
||||
Some(ValueData::TimestampNanosecondValue(v)) => {
|
||||
assert_ne!(*v, 0);
|
||||
}
|
||||
_ => panic!("expect null value"),
|
||||
}
|
||||
_ => panic!("expect null value"),
|
||||
}
|
||||
{
|
||||
let input_value_str = r#"
|
||||
{
|
||||
"reqTimeSec": "1573840000.000"
|
||||
}
|
||||
"#;
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
---
|
||||
description: Pipeline for Demo Log
|
||||
|
||||
processors:
|
||||
- gsub:
|
||||
field: reqTimeSec
|
||||
pattern: "\\."
|
||||
replacement: ""
|
||||
- epoch:
|
||||
field: reqTimeSec
|
||||
resolution: millisecond
|
||||
ignore_missing: true
|
||||
|
||||
transform:
|
||||
- field: reqTimeSec
|
||||
type: epoch, millisecond
|
||||
index: timestamp
|
||||
"#;
|
||||
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
|
||||
let pipeline: Pipeline<GreptimeTransformer> =
|
||||
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
|
||||
let mut payload = pipeline.init_intermediate_state();
|
||||
pipeline.prepare(input_value, &mut payload).unwrap();
|
||||
assert_eq!(&["reqTimeSec"].to_vec(), pipeline.required_keys());
|
||||
assert_eq!(payload, vec![Value::String("1573840000.000".to_string())]);
|
||||
let result = pipeline.exec_mut(&mut payload).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
result.values[0].value_data,
|
||||
Some(ValueData::TimestampMillisecondValue(1573840000000))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user