feat(pipeline): join processor (#4158)

* feat: add join processor

* test: add join simple test

* chore: fix header

* chore: update commit

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* test: add more join test

* chore: fix lint

* chore: update comment

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
shuiyisong
2024-06-18 13:00:34 +08:00
committed by GitHub
parent 141d017576
commit cb657ae51e
7 changed files with 315 additions and 45 deletions

View File

@@ -17,14 +17,13 @@ use regex::Regex;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME,
IGNORE_MISSING_NAME, PATTERN_NAME,
};
use crate::etl::value::{Array, Map, Value};
pub(crate) const PROCESSOR_GSUB: &str = "gsub";
const REPLACEMENT_NAME: &str = "replacement";
const PATTERN_NAME: &str = "pattern";
/// A processor to replace all matches of a pattern in string by a replacement, only support string value, and array string value
#[derive(Debug, Default)]

View File

@@ -0,0 +1,149 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, SEPARATOR_NAME,
};
use crate::etl::value::{Array, Map, Value};
pub(crate) const PROCESSOR_JOIN: &str = "join";
/// A processor to join each element of an array into a single string using a separator string between each element
#[derive(Debug, Default)]
pub struct JoinProcessor {
fields: Fields,
separator: Option<String>,
ignore_missing: bool,
}
impl JoinProcessor {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields;
}
fn with_separator(&mut self, separator: impl Into<String>) {
self.separator = Some(separator.into());
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
fn process_field(&self, arr: &Array, field: &Field) -> Result<Map, String> {
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let sep = self.separator.as_ref().unwrap();
let val = arr
.iter()
.map(|v| v.to_str_value())
.collect::<Vec<String>>()
.join(sep);
Ok(Map::one(key, Value::String(val)))
}
fn check(self) -> Result<Self, String> {
if self.separator.is_none() {
return Err("separator is required".to_string());
}
Ok(self)
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessor {
type Error = String;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = JoinProcessor::default();
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
SEPARATOR_NAME => {
processor.with_separator(yaml_string(v, SEPARATOR_NAME)?);
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
_ => {}
}
}
processor.check()
}
}
impl Processor for JoinProcessor {
fn fields(&self) -> &Fields {
&self.fields
}
fn kind(&self) -> &str {
PROCESSOR_JOIN
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::Array(arr) => self.process_field(arr, field),
_ => Err(format!(
"{} processor: expect array value, but got {val:?}",
self.kind()
)),
}
}
}
#[cfg(test)]
mod tests {
use crate::etl::field::Field;
use crate::etl::processor::join::JoinProcessor;
use crate::etl::processor::Processor;
use crate::etl::value::{Map, Value};
#[test]
fn test_join_processor() {
let mut processor = JoinProcessor::default();
processor.with_separator("-");
let field = Field::new("test");
let arr = Value::Array(
vec![
Value::String("a".to_string()),
Value::String("b".to_string()),
]
.into(),
);
let result = processor.exec_field(&arr, &field).unwrap();
assert_eq!(result, Map::one("test", Value::String("a-b".to_string())));
}
}

View File

@@ -18,6 +18,7 @@ pub mod date;
pub mod dissect;
pub mod epoch;
pub mod gsub;
pub mod join;
pub mod letter;
pub mod regex;
pub mod urlencoding;
@@ -31,6 +32,7 @@ use date::DateProcessor;
use dissect::DissectProcessor;
use epoch::EpochProcessor;
use gsub::GsubProcessor;
use join::JoinProcessor;
use letter::LetterProcessor;
use regex::RegexProcessor;
use urlencoding::UrlEncodingProcessor;
@@ -42,7 +44,9 @@ const FIELD_NAME: &str = "field";
const FIELDS_NAME: &str = "fields";
const IGNORE_MISSING_NAME: &str = "ignore_missing";
const METHOD_NAME: &str = "method";
const PATTERN_NAME: &str = "pattern";
const PATTERNS_NAME: &str = "patterns";
const SEPARATOR_NAME: &str = "separator";
// const IF_NAME: &str = "if";
// const IGNORE_FAILURE_NAME: &str = "ignore_failure";
@@ -166,6 +170,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<Arc<dyn Processor>, String>
dissect::PROCESSOR_DISSECT => Arc::new(DissectProcessor::try_from(value)?),
epoch::PROCESSOR_EPOCH => Arc::new(EpochProcessor::try_from(value)?),
gsub::PROCESSOR_GSUB => Arc::new(GsubProcessor::try_from(value)?),
join::PROCESSOR_JOIN => Arc::new(JoinProcessor::try_from(value)?),
letter::PROCESSOR_LETTER => Arc::new(LetterProcessor::try_from(value)?),
regex::PROCESSOR_REGEX => Arc::new(RegexProcessor::try_from(value)?),
urlencoding::PROCESSOR_URL_ENCODING => Arc::new(UrlEncodingProcessor::try_from(value)?),

View File

@@ -0,0 +1,30 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use greptime_proto::v1::Rows;
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};
/// test util function to parse and execute pipeline
pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_str)
.expect("failed to parse into json")
.try_into()
.expect("failed to convert into value");
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
pipeline.exec(input_value).expect("failed to exec pipeline")
}

View File

@@ -14,7 +14,8 @@
use greptime_proto::v1::value::ValueData::TimestampMillisecondValue;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};
mod common;
#[test]
fn test_gsub() {
@@ -25,10 +26,6 @@ fn test_gsub() {
}
]
"#;
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_value_str)
.expect("failed to parse input value")
.try_into()
.expect("failed to convert input value");
let pipeline_yaml = r#"
---
@@ -50,10 +47,7 @@ transform:
index: timestamp
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let output = pipeline.exec(input_value).expect("failed to exec pipeline");
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![ColumnSchema {
column_name: "reqTimeSec".to_string(),

121
src/pipeline/tests/join.rs Normal file
View File

@@ -0,0 +1,121 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use lazy_static::lazy_static;
mod common;
const PIPELINE_YAML: &str = r#"
---
processors:
- join:
field: join_test
separator: "-"
transform:
- field: join_test
type: string
"#;
lazy_static! {
pub static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
ColumnSchema {
column_name: "join_test".to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
},
ColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
},
];
}
#[test]
fn test_simple_join() {
let input_value_str = r#"
[
{
"join_test": ["a", "b", "c"]
}
]
"#;
let output = common::parse_and_exec(input_value_str, PIPELINE_YAML);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("a-b-c".to_string()))
);
}
#[test]
fn test_integer_join() {
let input_value_str = r#"
[
{
"join_test": [1, 2, 3]
}
]
"#;
let output = common::parse_and_exec(input_value_str, PIPELINE_YAML);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("1-2-3".to_string()))
);
}
#[test]
fn test_boolean() {
let input_value_str = r#"
[
{
"join_test": [true, false, true]
}
]
"#;
let output = common::parse_and_exec(input_value_str, PIPELINE_YAML);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("true-false-true".to_string()))
);
}
#[test]
fn test_float() {
let input_value_str = r#"
[
{
"join_test": [1.1, 1.2, 1.3]
}
]
"#;
let output = common::parse_and_exec(input_value_str, PIPELINE_YAML);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("1.1-1.2-1.3".to_string()))
);
}

View File

@@ -14,7 +14,8 @@
use greptime_proto::v1::value::ValueData::{U16Value, U8Value};
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};
mod common;
#[test]
fn test_on_failure_with_ignore() {
@@ -25,10 +26,6 @@ fn test_on_failure_with_ignore() {
}
]
"#;
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_value_str)
.expect("failed to parse input value")
.try_into()
.expect("failed to convert input value");
let pipeline_yaml = r#"
---
@@ -40,11 +37,7 @@ transform:
type: uint8
on_failure: ignore
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let output = pipeline.exec(input_value).expect("failed to exec pipeline");
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
ColumnSchema {
@@ -74,10 +67,6 @@ fn test_on_failure_with_default() {
}
]
"#;
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_value_str)
.expect("failed to parse input value")
.try_into()
.expect("failed to convert input value");
let pipeline_yaml = r#"
---
@@ -91,10 +80,7 @@ transform:
on_failure: default
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let output = pipeline.exec(input_value).expect("failed to exec pipeline");
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
ColumnSchema {
@@ -120,10 +106,6 @@ fn test_default() {
let input_value_str = r#"
[{}]
"#;
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_value_str)
.expect("failed to parse input value")
.try_into()
.expect("failed to convert input value");
let pipeline_yaml = r#"
---
@@ -136,10 +118,7 @@ transform:
default: 0
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let output = pipeline.exec(input_value).expect("failed to exec pipeline");
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
ColumnSchema {
@@ -170,10 +149,6 @@ fn test_multiple_on_failure() {
}
]
"#;
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_value_str)
.expect("failed to parse input value")
.try_into()
.expect("failed to convert input value");
let pipeline_yaml = r#"
---
@@ -192,10 +167,7 @@ transform:
on_failure: default
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let output = pipeline.exec(input_value).expect("failed to exec pipeline");
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
ColumnSchema {