fix: pipeline dissert error is returned directly to the user, instead of printing a warn log (#4709)

* fix: pipeline dissert error is returned directly to the user, instead of printing a warn log

* chore: add more test for pipeline
This commit is contained in:
localhost
2024-09-13 02:21:05 +08:00
committed by GitHub
parent 67fb3d003e
commit 36b1bafbf0
4 changed files with 98 additions and 17 deletions

View File

@@ -21,7 +21,7 @@ pub mod value;
use ahash::HashSet;
use common_telemetry::debug;
use itertools::{merge, Itertools};
use itertools::Itertools;
use processor::{Processor, ProcessorBuilder, Processors};
use transform::{TransformBuilders, Transformer, Transforms};
use value::Value;
@@ -91,13 +91,18 @@ where
debug!("required_keys: {:?}", required_keys);
// intermediate keys are the keys that all processor and transformer required
let ordered_intermediate_keys: Vec<String> =
merge(processors_required_keys, transforms_required_keys)
.cloned()
.collect::<HashSet<String>>()
.into_iter()
.sorted()
.collect();
let ordered_intermediate_keys: Vec<String> = [
processors_required_keys,
transforms_required_keys,
processors_output_keys,
]
.iter()
.flat_map(|l| l.iter())
.collect::<HashSet<&String>>()
.into_iter()
.sorted()
.cloned()
.collect_vec();
let mut final_intermediate_keys = Vec::with_capacity(ordered_intermediate_keys.len());
let mut intermediate_keys_exclude_original =

View File

@@ -817,16 +817,12 @@ impl Processor for DissectProcessor {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
Some(Value::String(val_str)) => match self.process(val_str) {
Ok(r) => {
for (k, v) in r {
val[k] = v;
}
Some(Value::String(val_str)) => {
let r = self.process(val_str)?;
for (k, v) in r {
val[k] = v;
}
Err(e) => {
warn!("dissect processor: {}", e);
}
},
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(

View File

@@ -247,3 +247,37 @@ transform:
Some(StringValue("key1_key2".to_string()))
);
}
#[test]
fn test_parse_failure() {
let input_str = r#"
{
"str": "key1 key2"
}"#;
let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{key1} %{key2} %{key3}"
transform:
- fields:
- key1
type: string
"#;
let input_value = serde_json::from_str::<serde_json::Value>(input_str).unwrap();
let yaml_content = pipeline::Content::Yaml(pipeline_yaml.into());
let pipeline: pipeline::Pipeline<pipeline::GreptimeTransformer> =
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
let mut result = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut result).unwrap();
let row = pipeline.exec_mut(&mut result);
assert!(row.is_err());
assert_eq!(row.err().unwrap(), "No matching pattern found");
}

View File

@@ -122,3 +122,49 @@ transform:
assert_eq!(output.rows[0].values[0].value_data, None);
}
#[test]
fn test_unuse_regex_group() {
let input_value_str = r#"
[
{
"str": "123 456"
}
]
"#;
let pipeline_yaml = r#"
processors:
- regex:
fields:
- str
pattern: "(?<id1>\\d+) (?<id2>\\d+)"
transform:
- field: str_id1
type: string
"#;
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
assert_eq!(
output.schema,
vec![
common::make_column_schema(
"str_id1".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
]
);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
}