feat(pipeline): gsub prosessor (#4121)

* chore: add log http ingester scaffold

* chore: add some example code

* chore: add log inserter

* chore: add log handler file

* chore: add pipeline lib

* chore: import log handler

* chore: add pipelime http handler

* chore: add pipeline private table

* chore: add pipeline API

* chore: improve error handling

* chore: merge main

* chore: add multi content type support for log handler

* refactor: remove servers dep on pipeline

* refactor: move define_into_tonic_status to common-error

* refactor: bring in pipeline 3eb890c551b8d7f60c4491fcfec18966e2b210a4

* chore: fix typo

* refactor: bring in pipeline a95c9767d7056ab01dd8ca5fa1214456c6ffc72c

* chore: fix typo and license header

* refactor: move http event handler to a separate file

* chore: add test for pipeline

* chore: fmt

* refactor: bring in pipeline 7d2402701877901871dd1294a65ac937605a6a93

* refactor: move `pipeline_operator` to `pipeline` crate

* chore: minor update

* refactor: bring in pipeline 1711f4d46687bada72426d88cda417899e0ae3a4

* chore: add log

* chore: add log

* chore: remove open hook

* chore: minor update

* chore: fix fmt

* chore: minor update

* chore: rename desc for pipeline table

* refactor: remove updated_at in pipelines

* chore: add more content type support for log inserter api

* chore: introduce pipeline crate

* chore: update upload pipeline api

* chore: fix by pr commit

* chore: add some doc for pub fn/struct

* chore: some minro fix

* chore: add pipeline version support

* chore: impl log pipeline version

* gsub prosessor

* chore: add test

* chore: update commit

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Co-authored-by: paomian <xpaomian@gmail.com>
Co-authored-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
yuanbohan
2024-06-17 15:57:47 +08:00
committed by GitHub
parent 0aceebf0a3
commit 0fc18b6865
4 changed files with 314 additions and 0 deletions

View File

@@ -0,0 +1,229 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use regex::Regex;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME,
};
use crate::etl::value::{Array, Map, Value};
pub(crate) const PROCESSOR_GSUB: &str = "gsub";
const REPLACEMENT_NAME: &str = "replacement";
const PATTERN_NAME: &str = "pattern";
/// A processor to replace all matches of a pattern in string by a replacement, only support string value, and array string value
#[derive(Debug, Default)]
pub struct GsubProcessor {
fields: Fields,
pattern: Option<Regex>,
replacement: Option<String>,
ignore_missing: bool,
}
impl GsubProcessor {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields;
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
fn try_pattern(&mut self, pattern: &str) -> Result<(), String> {
self.pattern = Some(Regex::new(pattern).map_err(|e| e.to_string())?);
Ok(())
}
fn with_replacement(&mut self, replacement: impl Into<String>) {
self.replacement = Some(replacement.into());
}
fn check(self) -> Result<Self, String> {
if self.pattern.is_none() {
return Err("pattern is required".to_string());
}
if self.replacement.is_none() {
return Err("replacement is required".to_string());
}
Ok(self)
}
fn process_string_field(&self, val: &str, field: &Field) -> Result<Map, String> {
let replacement = self.replacement.as_ref().unwrap();
let new_val = self
.pattern
.as_ref()
.unwrap()
.replace_all(val, replacement)
.to_string();
let val = Value::String(new_val);
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
Ok(Map::one(key, val))
}
fn process_array_field(&self, arr: &Array, field: &Field) -> Result<Map, String> {
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let re = self.pattern.as_ref().unwrap();
let replacement = self.replacement.as_ref().unwrap();
let mut result = Array::default();
for val in arr.iter() {
match val {
Value::String(haystack) => {
let new_val = re.replace_all(haystack, replacement).to_string();
result.push(Value::String(new_val));
}
_ => {
return Err(format!(
"{} processor: expect string or array string, but got {val:?}",
self.kind()
))
}
}
}
Ok(Map::one(key, Value::Array(result)))
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor {
type Error = String;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = GsubProcessor::default();
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
PATTERN_NAME => {
processor.try_pattern(&yaml_string(v, PATTERN_NAME)?)?;
}
REPLACEMENT_NAME => {
processor.with_replacement(yaml_string(v, REPLACEMENT_NAME)?);
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
_ => {}
}
}
processor.check()
}
}
impl crate::etl::processor::Processor for GsubProcessor {
fn kind(&self) -> &str {
PROCESSOR_GSUB
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn fields(&self) -> &Fields {
&self.fields
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_string_field(val, field),
Value::Array(arr) => self.process_array_field(arr, field),
_ => Err(format!(
"{} processor: expect string or array string, but got {val:?}",
self.kind()
)),
}
}
}
#[cfg(test)]
mod tests {
use crate::etl::field::Field;
use crate::etl::processor::gsub::GsubProcessor;
use crate::etl::processor::Processor;
use crate::etl::value::{Map, Value};
#[test]
fn test_string_value() {
let mut processor = GsubProcessor::default();
processor.try_pattern(r"\d+").unwrap();
processor.with_replacement("xxx");
let field = Field::new("message");
let val = Value::String("123".to_string());
let result = processor.exec_field(&val, &field).unwrap();
assert_eq!(
result,
Map::one("message", Value::String("xxx".to_string()))
);
}
#[test]
fn test_array_string_value() {
let mut processor = GsubProcessor::default();
processor.try_pattern(r"\d+").unwrap();
processor.with_replacement("xxx");
let field = Field::new("message");
let val = Value::Array(
vec![
Value::String("123".to_string()),
Value::String("456".to_string()),
]
.into(),
);
let result = processor.exec_field(&val, &field).unwrap();
assert_eq!(
result,
Map::one(
"message",
Value::Array(
vec![
Value::String("xxx".to_string()),
Value::String("xxx".to_string())
]
.into()
)
)
);
}
}

View File

@@ -17,6 +17,7 @@ pub mod csv;
pub mod date;
pub mod dissect;
pub mod epoch;
pub mod gsub;
pub mod letter;
pub mod regex;
pub mod urlencoding;
@@ -29,6 +30,7 @@ use csv::CsvProcessor;
use date::DateProcessor;
use dissect::DissectProcessor;
use epoch::EpochProcessor;
use gsub::GsubProcessor;
use letter::LetterProcessor;
use regex::RegexProcessor;
use urlencoding::UrlEncodingProcessor;
@@ -163,6 +165,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<Arc<dyn Processor>, String>
date::PROCESSOR_DATE => Arc::new(DateProcessor::try_from(value)?),
dissect::PROCESSOR_DISSECT => Arc::new(DissectProcessor::try_from(value)?),
epoch::PROCESSOR_EPOCH => Arc::new(EpochProcessor::try_from(value)?),
gsub::PROCESSOR_GSUB => Arc::new(GsubProcessor::try_from(value)?),
letter::PROCESSOR_LETTER => Arc::new(LetterProcessor::try_from(value)?),
regex::PROCESSOR_REGEX => Arc::new(RegexProcessor::try_from(value)?),
urlencoding::PROCESSOR_URL_ENCODING => Arc::new(UrlEncodingProcessor::try_from(value)?),

View File

@@ -45,6 +45,12 @@ impl std::ops::Deref for Array {
}
}
impl std::ops::DerefMut for Array {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.values
}
}
impl IntoIterator for Array {
type Item = Value;
@@ -54,3 +60,9 @@ impl IntoIterator for Array {
self.values.into_iter()
}
}
impl From<Vec<Value>> for Array {
fn from(values: Vec<Value>) -> Self {
Array { values }
}
}

View File

@@ -0,0 +1,70 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use greptime_proto::v1::value::ValueData::TimestampMillisecondValue;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};
#[test]
fn test_gsub() {
let input_value_str = r#"
[
{
"reqTimeSec": "1573840000.000"
}
]
"#;
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_value_str)
.expect("failed to parse input value")
.try_into()
.expect("failed to convert input value");
let pipeline_yaml = r#"
---
description: Pipeline for Akamai DataStream2 Log
processors:
- gsub:
field: reqTimeSec
pattern: "\\."
replacement: ""
- epoch:
field: reqTimeSec
resolution: millisecond
ignore_missing: true
transform:
- field: reqTimeSec
type: epoch, millisecond
index: timestamp
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let output = pipeline.exec(input_value).expect("failed to exec pipeline");
let expected_schema = vec![ColumnSchema {
column_name: "reqTimeSec".to_string(),
datatype: ColumnDataType::TimestampMillisecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
}];
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(TimestampMillisecondValue(1573840000000))
);
}