diff --git a/src/pipeline/src/etl/processor/gsub.rs b/src/pipeline/src/etl/processor/gsub.rs index 736d55998d..2a2fb87902 100644 --- a/src/pipeline/src/etl/processor/gsub.rs +++ b/src/pipeline/src/etl/processor/gsub.rs @@ -17,14 +17,13 @@ use regex::Regex; use crate::etl::field::{Field, Fields}; use crate::etl::processor::{ yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME, - IGNORE_MISSING_NAME, + IGNORE_MISSING_NAME, PATTERN_NAME, }; use crate::etl::value::{Array, Map, Value}; pub(crate) const PROCESSOR_GSUB: &str = "gsub"; const REPLACEMENT_NAME: &str = "replacement"; -const PATTERN_NAME: &str = "pattern"; /// A processor to replace all matches of a pattern in string by a replacement, only support string value, and array string value #[derive(Debug, Default)] diff --git a/src/pipeline/src/etl/processor/join.rs b/src/pipeline/src/etl/processor/join.rs new file mode 100644 index 0000000000..da9de55188 --- /dev/null +++ b/src/pipeline/src/etl/processor/join.rs @@ -0,0 +1,149 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::etl::field::{Field, Fields}; +use crate::etl::processor::{ + yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME, + IGNORE_MISSING_NAME, SEPARATOR_NAME, +}; +use crate::etl::value::{Array, Map, Value}; + +pub(crate) const PROCESSOR_JOIN: &str = "join"; + +/// A processor to join each element of an array into a single string using a separator string between each element +#[derive(Debug, Default)] +pub struct JoinProcessor { + fields: Fields, + separator: Option, + ignore_missing: bool, +} + +impl JoinProcessor { + fn with_fields(&mut self, fields: Fields) { + self.fields = fields; + } + + fn with_separator(&mut self, separator: impl Into) { + self.separator = Some(separator.into()); + } + + fn with_ignore_missing(&mut self, ignore_missing: bool) { + self.ignore_missing = ignore_missing; + } + + fn process_field(&self, arr: &Array, field: &Field) -> Result { + let key = match field.target_field { + Some(ref target_field) => target_field, + None => field.get_field(), + }; + + let sep = self.separator.as_ref().unwrap(); + let val = arr + .iter() + .map(|v| v.to_str_value()) + .collect::>() + .join(sep); + + Ok(Map::one(key, Value::String(val))) + } + + fn check(self) -> Result { + if self.separator.is_none() { + return Err("separator is required".to_string()); + } + + Ok(self) + } +} + +impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessor { + type Error = String; + + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + let mut processor = JoinProcessor::default(); + + for (k, v) in value.iter() { + let key = k + .as_str() + .ok_or(format!("key must be a string, but got {k:?}"))?; + match key { + FIELD_NAME => { + processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?)); + } + FIELDS_NAME => { + processor.with_fields(yaml_fields(v, FIELDS_NAME)?); + } + SEPARATOR_NAME => { + processor.with_separator(yaml_string(v, SEPARATOR_NAME)?); + } + IGNORE_MISSING_NAME => { + processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?); + } + _ => {} + } + } + + processor.check() + } +} + +impl Processor for JoinProcessor { + fn fields(&self) -> &Fields { + &self.fields + } + + fn kind(&self) -> &str { + PROCESSOR_JOIN + } + + fn ignore_missing(&self) -> bool { + self.ignore_missing + } + + fn exec_field(&self, val: &Value, field: &Field) -> Result { + match val { + Value::Array(arr) => self.process_field(arr, field), + _ => Err(format!( + "{} processor: expect array value, but got {val:?}", + self.kind() + )), + } + } +} + +#[cfg(test)] +mod tests { + + use crate::etl::field::Field; + use crate::etl::processor::join::JoinProcessor; + use crate::etl::processor::Processor; + use crate::etl::value::{Map, Value}; + + #[test] + fn test_join_processor() { + let mut processor = JoinProcessor::default(); + processor.with_separator("-"); + + let field = Field::new("test"); + let arr = Value::Array( + vec![ + Value::String("a".to_string()), + Value::String("b".to_string()), + ] + .into(), + ); + let result = processor.exec_field(&arr, &field).unwrap(); + assert_eq!(result, Map::one("test", Value::String("a-b".to_string()))); + } +} diff --git a/src/pipeline/src/etl/processor/mod.rs b/src/pipeline/src/etl/processor/mod.rs index a5782d2a14..319b15cae3 100644 --- a/src/pipeline/src/etl/processor/mod.rs +++ b/src/pipeline/src/etl/processor/mod.rs @@ -18,6 +18,7 @@ pub mod date; pub mod dissect; pub mod epoch; pub mod gsub; +pub mod join; pub mod letter; pub mod regex; pub mod urlencoding; @@ -31,6 +32,7 @@ use date::DateProcessor; use dissect::DissectProcessor; use epoch::EpochProcessor; use gsub::GsubProcessor; +use join::JoinProcessor; use letter::LetterProcessor; use regex::RegexProcessor; use urlencoding::UrlEncodingProcessor; @@ -42,7 +44,9 @@ const FIELD_NAME: &str = "field"; const FIELDS_NAME: &str = "fields"; const IGNORE_MISSING_NAME: &str = "ignore_missing"; const METHOD_NAME: &str = "method"; +const PATTERN_NAME: &str = "pattern"; const PATTERNS_NAME: &str = "patterns"; +const SEPARATOR_NAME: &str = "separator"; // const IF_NAME: &str = "if"; // const IGNORE_FAILURE_NAME: &str = "ignore_failure"; @@ -166,6 +170,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result, String> dissect::PROCESSOR_DISSECT => Arc::new(DissectProcessor::try_from(value)?), epoch::PROCESSOR_EPOCH => Arc::new(EpochProcessor::try_from(value)?), gsub::PROCESSOR_GSUB => Arc::new(GsubProcessor::try_from(value)?), + join::PROCESSOR_JOIN => Arc::new(JoinProcessor::try_from(value)?), letter::PROCESSOR_LETTER => Arc::new(LetterProcessor::try_from(value)?), regex::PROCESSOR_REGEX => Arc::new(RegexProcessor::try_from(value)?), urlencoding::PROCESSOR_URL_ENCODING => Arc::new(UrlEncodingProcessor::try_from(value)?), diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs new file mode 100644 index 0000000000..cf75fd773b --- /dev/null +++ b/src/pipeline/tests/common.rs @@ -0,0 +1,30 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use greptime_proto::v1::Rows; +use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value}; + +/// test util function to parse and execute pipeline +pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { + let input_value: Value = serde_json::from_str::(input_str) + .expect("failed to parse into json") + .try_into() + .expect("failed to convert into value"); + + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = + parse(&yaml_content).expect("failed to parse pipeline"); + + pipeline.exec(input_value).expect("failed to exec pipeline") +} diff --git a/src/pipeline/tests/gsub.rs b/src/pipeline/tests/gsub.rs index 94fa687533..5d25bf188b 100644 --- a/src/pipeline/tests/gsub.rs +++ b/src/pipeline/tests/gsub.rs @@ -14,7 +14,8 @@ use greptime_proto::v1::value::ValueData::TimestampMillisecondValue; use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; -use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value}; + +mod common; #[test] fn test_gsub() { @@ -25,10 +26,6 @@ fn test_gsub() { } ] "#; - let input_value: Value = serde_json::from_str::(input_value_str) - .expect("failed to parse input value") - .try_into() - .expect("failed to convert input value"); let pipeline_yaml = r#" --- @@ -50,10 +47,7 @@ transform: index: timestamp "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); - let pipeline: Pipeline = - parse(&yaml_content).expect("failed to parse pipeline"); - let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ColumnSchema { column_name: "reqTimeSec".to_string(), diff --git a/src/pipeline/tests/join.rs b/src/pipeline/tests/join.rs new file mode 100644 index 0000000000..b7c8c627d2 --- /dev/null +++ b/src/pipeline/tests/join.rs @@ -0,0 +1,121 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use greptime_proto::v1::value::ValueData::StringValue; +use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use lazy_static::lazy_static; + +mod common; + +const PIPELINE_YAML: &str = r#" +--- +processors: + - join: + field: join_test + separator: "-" + +transform: + - field: join_test + type: string +"#; + +lazy_static! { + pub static ref EXPECTED_SCHEMA: Vec = vec![ + ColumnSchema { + column_name: "join_test".to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + datatype_extension: None, + }, + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + datatype_extension: None, + }, + ]; +} + +#[test] +fn test_simple_join() { + let input_value_str = r#" + [ + { + "join_test": ["a", "b", "c"] + } + ] +"#; + + let output = common::parse_and_exec(input_value_str, PIPELINE_YAML); + + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("a-b-c".to_string())) + ); +} + +#[test] +fn test_integer_join() { + let input_value_str = r#" + [ + { + "join_test": [1, 2, 3] + } + ] +"#; + let output = common::parse_and_exec(input_value_str, PIPELINE_YAML); + + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("1-2-3".to_string())) + ); +} + +#[test] +fn test_boolean() { + let input_value_str = r#" + [ + { + "join_test": [true, false, true] + } + ] +"#; + let output = common::parse_and_exec(input_value_str, PIPELINE_YAML); + + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("true-false-true".to_string())) + ); +} + +#[test] +fn test_float() { + let input_value_str = r#" + [ + { + "join_test": [1.1, 1.2, 1.3] + } + ] +"#; + let output = common::parse_and_exec(input_value_str, PIPELINE_YAML); + + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("1.1-1.2-1.3".to_string())) + ); +} diff --git a/src/pipeline/tests/on_failure.rs b/src/pipeline/tests/on_failure.rs index 4934048e19..c0d69f4415 100644 --- a/src/pipeline/tests/on_failure.rs +++ b/src/pipeline/tests/on_failure.rs @@ -14,7 +14,8 @@ use greptime_proto::v1::value::ValueData::{U16Value, U8Value}; use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; -use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value}; + +mod common; #[test] fn test_on_failure_with_ignore() { @@ -25,10 +26,6 @@ fn test_on_failure_with_ignore() { } ] "#; - let input_value: Value = serde_json::from_str::(input_value_str) - .expect("failed to parse input value") - .try_into() - .expect("failed to convert input value"); let pipeline_yaml = r#" --- @@ -40,11 +37,7 @@ transform: type: uint8 on_failure: ignore "#; - - let yaml_content = Content::Yaml(pipeline_yaml.into()); - let pipeline: Pipeline = - parse(&yaml_content).expect("failed to parse pipeline"); - let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ ColumnSchema { @@ -74,10 +67,6 @@ fn test_on_failure_with_default() { } ] "#; - let input_value: Value = serde_json::from_str::(input_value_str) - .expect("failed to parse input value") - .try_into() - .expect("failed to convert input value"); let pipeline_yaml = r#" --- @@ -91,10 +80,7 @@ transform: on_failure: default "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); - let pipeline: Pipeline = - parse(&yaml_content).expect("failed to parse pipeline"); - let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ ColumnSchema { @@ -120,10 +106,6 @@ fn test_default() { let input_value_str = r#" [{}] "#; - let input_value: Value = serde_json::from_str::(input_value_str) - .expect("failed to parse input value") - .try_into() - .expect("failed to convert input value"); let pipeline_yaml = r#" --- @@ -136,10 +118,7 @@ transform: default: 0 "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); - let pipeline: Pipeline = - parse(&yaml_content).expect("failed to parse pipeline"); - let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ ColumnSchema { @@ -170,10 +149,6 @@ fn test_multiple_on_failure() { } ] "#; - let input_value: Value = serde_json::from_str::(input_value_str) - .expect("failed to parse input value") - .try_into() - .expect("failed to convert input value"); let pipeline_yaml = r#" --- @@ -192,10 +167,7 @@ transform: on_failure: default "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); - let pipeline: Pipeline = - parse(&yaml_content).expect("failed to parse pipeline"); - let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ ColumnSchema {