mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: add decolorize processor (#5065)
* feat: add decolorize processor Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * Update src/pipeline/src/etl/processor/cmcd.rs * add crate level integration test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
pub mod cmcd;
|
||||
pub mod csv;
|
||||
pub mod date;
|
||||
pub mod decolorize;
|
||||
pub mod dissect;
|
||||
pub mod epoch;
|
||||
pub mod gsub;
|
||||
@@ -29,6 +30,7 @@ use ahash::{HashSet, HashSetExt};
|
||||
use cmcd::{CmcdProcessor, CmcdProcessorBuilder};
|
||||
use csv::{CsvProcessor, CsvProcessorBuilder};
|
||||
use date::{DateProcessor, DateProcessorBuilder};
|
||||
use decolorize::{DecolorizeProcessor, DecolorizeProcessorBuilder};
|
||||
use dissect::{DissectProcessor, DissectProcessorBuilder};
|
||||
use enum_dispatch::enum_dispatch;
|
||||
use epoch::{EpochProcessor, EpochProcessorBuilder};
|
||||
@@ -61,11 +63,6 @@ const TARGET_FIELDS_NAME: &str = "target_fields";
|
||||
const JSON_PATH_NAME: &str = "json_path";
|
||||
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
|
||||
|
||||
// const IF_NAME: &str = "if";
|
||||
// const IGNORE_FAILURE_NAME: &str = "ignore_failure";
|
||||
// const ON_FAILURE_NAME: &str = "on_failure";
|
||||
// const TAG_NAME: &str = "tag";
|
||||
|
||||
/// Processor trait defines the interface for all processors.
|
||||
///
|
||||
/// A processor is a transformation that can be applied to a field in a document
|
||||
@@ -99,6 +96,7 @@ pub enum ProcessorKind {
|
||||
Epoch(EpochProcessor),
|
||||
Date(DateProcessor),
|
||||
JsonPath(JsonPathProcessor),
|
||||
Decolorize(DecolorizeProcessor),
|
||||
}
|
||||
|
||||
/// ProcessorBuilder trait defines the interface for all processor builders
|
||||
@@ -128,6 +126,7 @@ pub enum ProcessorBuilders {
|
||||
Epoch(EpochProcessorBuilder),
|
||||
Date(DateProcessorBuilder),
|
||||
JsonPath(JsonPathProcessorBuilder),
|
||||
Decolorize(DecolorizeProcessorBuilder),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -275,6 +274,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders> {
|
||||
json_path::PROCESSOR_JSON_PATH => {
|
||||
ProcessorBuilders::JsonPath(json_path::JsonPathProcessorBuilder::try_from(value)?)
|
||||
}
|
||||
decolorize::PROCESSOR_DECOLORIZE => {
|
||||
ProcessorBuilders::Decolorize(DecolorizeProcessorBuilder::try_from(value)?)
|
||||
}
|
||||
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
|
||||
};
|
||||
|
||||
|
||||
@@ -12,6 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Pipeline Processor for CMCD (Common Media Client Data) data.
|
||||
//!
|
||||
//! Refer to [`CmcdProcessor`] for more information.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use ahash::HashSet;
|
||||
|
||||
195
src/pipeline/src/etl/processor/decolorize.rs
Normal file
195
src/pipeline/src/etl/processor/decolorize.rs
Normal file
@@ -0,0 +1,195 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Removes ANSI color control codes from the input text.
|
||||
//!
|
||||
//! Similar to [`decolorize`](https://grafana.com/docs/loki/latest/query/log_queries/#removing-color-codes)
|
||||
//! from Grafana Loki and [`strip_ansi_escape_codes`](https://vector.dev/docs/reference/vrl/functions/#strip_ansi_escape_codes)
|
||||
//! from Vector VRL.
|
||||
|
||||
use ahash::HashSet;
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::etl::error::{
|
||||
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
|
||||
};
|
||||
use crate::etl::field::{Fields, OneInputOneOutputField};
|
||||
use crate::etl::processor::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, ProcessorBuilder, ProcessorKind, FIELDS_NAME,
|
||||
FIELD_NAME, IGNORE_MISSING_NAME,
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
|
||||
pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize";
|
||||
|
||||
static RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\x1b\[[0-9;]*m").unwrap());
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct DecolorizeProcessorBuilder {
|
||||
fields: Fields,
|
||||
ignore_missing: bool,
|
||||
}
|
||||
|
||||
impl ProcessorBuilder for DecolorizeProcessorBuilder {
|
||||
fn output_keys(&self) -> HashSet<&str> {
|
||||
self.fields
|
||||
.iter()
|
||||
.map(|f| f.target_or_input_field())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn input_keys(&self) -> HashSet<&str> {
|
||||
self.fields.iter().map(|f| f.input_field()).collect()
|
||||
}
|
||||
|
||||
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
|
||||
self.build(intermediate_keys).map(ProcessorKind::Decolorize)
|
||||
}
|
||||
}
|
||||
|
||||
impl DecolorizeProcessorBuilder {
|
||||
fn build(self, intermediate_keys: &[String]) -> Result<DecolorizeProcessor> {
|
||||
let mut real_fields = vec![];
|
||||
for field in self.fields.into_iter() {
|
||||
let input = OneInputOneOutputField::build(
|
||||
"decolorize",
|
||||
intermediate_keys,
|
||||
field.input_field(),
|
||||
field.target_or_input_field(),
|
||||
)?;
|
||||
real_fields.push(input);
|
||||
}
|
||||
Ok(DecolorizeProcessor {
|
||||
fields: real_fields,
|
||||
ignore_missing: self.ignore_missing,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove ANSI color control codes from the input text.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct DecolorizeProcessor {
|
||||
fields: Vec<OneInputOneOutputField>,
|
||||
ignore_missing: bool,
|
||||
}
|
||||
|
||||
impl DecolorizeProcessor {
|
||||
fn process_string(&self, val: &str) -> Result<Value> {
|
||||
Ok(Value::String(RE.replace_all(val, "").into_owned()))
|
||||
}
|
||||
|
||||
fn process(&self, val: &Value) -> Result<Value> {
|
||||
match val {
|
||||
Value::String(val) => self.process_string(val),
|
||||
_ => ProcessorExpectStringSnafu {
|
||||
processor: PROCESSOR_DECOLORIZE,
|
||||
v: val.clone(),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&yaml_rust::yaml::Hash> for DecolorizeProcessorBuilder {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
|
||||
let mut fields = Fields::default();
|
||||
let mut ignore_missing = false;
|
||||
|
||||
for (k, v) in value.iter() {
|
||||
let key = k
|
||||
.as_str()
|
||||
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
|
||||
|
||||
match key {
|
||||
FIELD_NAME => {
|
||||
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
|
||||
}
|
||||
FIELDS_NAME => {
|
||||
fields = yaml_new_fields(v, FIELDS_NAME)?;
|
||||
}
|
||||
IGNORE_MISSING_NAME => {
|
||||
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DecolorizeProcessorBuilder {
|
||||
fields,
|
||||
ignore_missing,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::etl::processor::Processor for DecolorizeProcessor {
|
||||
fn kind(&self) -> &str {
|
||||
PROCESSOR_DECOLORIZE
|
||||
}
|
||||
|
||||
fn ignore_missing(&self) -> bool {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_index();
|
||||
match val.get(index) {
|
||||
Some(Value::Null) | None => {
|
||||
if !self.ignore_missing {
|
||||
return ProcessorMissingFieldSnafu {
|
||||
processor: self.kind(),
|
||||
field: field.input_name(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
Some(v) => {
|
||||
let result = self.process(v)?;
|
||||
let output_index = field.output_index();
|
||||
val[output_index] = result;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_decolorize_processor() {
|
||||
let processor = DecolorizeProcessor {
|
||||
fields: vec![],
|
||||
ignore_missing: false,
|
||||
};
|
||||
|
||||
let val = Value::String("\x1b[32mGreen\x1b[0m".to_string());
|
||||
let result = processor.process(&val).unwrap();
|
||||
assert_eq!(result, Value::String("Green".to_string()));
|
||||
|
||||
let val = Value::String("Plain text".to_string());
|
||||
let result = processor.process(&val).unwrap();
|
||||
assert_eq!(result, Value::String("Plain text".to_string()));
|
||||
|
||||
let val = Value::String("\x1b[46mfoo\x1b[0m bar".to_string());
|
||||
let result = processor.process(&val).unwrap();
|
||||
assert_eq!(result, Value::String("foo bar".to_string()));
|
||||
}
|
||||
}
|
||||
@@ -644,7 +644,6 @@ impl DissectProcessor {
|
||||
let mut pos = 0;
|
||||
|
||||
let mut appends: HashMap<usize, Vec<(String, u32)>> = HashMap::new();
|
||||
// let mut maps: HashMap<usize, (String,String)> = HashMap::new();
|
||||
|
||||
let mut process_name_value = |name: &Name, value: String| {
|
||||
let name_index = name.index;
|
||||
@@ -658,22 +657,6 @@ impl DissectProcessor {
|
||||
.or_default()
|
||||
.push((value, order.unwrap_or_default()));
|
||||
}
|
||||
// Some(StartModifier::MapKey) => match maps.get(&name_index) {
|
||||
// Some(map_val) => {
|
||||
// map.insert(value, Value::String(map_val.to_string()));
|
||||
// }
|
||||
// None => {
|
||||
// maps.insert(name_index, value);
|
||||
// }
|
||||
// },
|
||||
// Some(StartModifier::MapVal) => match maps.get(&name_index) {
|
||||
// Some(map_key) => {
|
||||
// map.insert(map_key, Value::String(value));
|
||||
// }
|
||||
// None => {
|
||||
// maps.insert(name_index, value);
|
||||
// }
|
||||
// },
|
||||
Some(_) => {
|
||||
// do nothing, ignore MapKey and MapVal
|
||||
// because transform can know the key name
|
||||
|
||||
@@ -132,10 +132,6 @@ impl GsubProcessor {
|
||||
v: val.clone(),
|
||||
}
|
||||
.fail(),
|
||||
// Err(format!(
|
||||
// "{} processor: expect string or array string, but got {val:?}",
|
||||
// self.kind()
|
||||
// )),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -674,3 +674,36 @@ transform:
|
||||
|
||||
assert_eq!(expected, r);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decolorize() {
|
||||
let input_value = serde_json::json!({
|
||||
"message": "\u{001b}[32mSuccess\u{001b}[0m and \u{001b}[31mError\u{001b}[0m"
|
||||
});
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
processors:
|
||||
- decolorize:
|
||||
fields:
|
||||
- message
|
||||
transform:
|
||||
- fields:
|
||||
- message
|
||||
type: string
|
||||
"#;
|
||||
let yaml_content = Content::Yaml(pipeline_yaml.into());
|
||||
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = pipeline.init_intermediate_state();
|
||||
pipeline.prepare(input_value, &mut status).unwrap();
|
||||
let row = pipeline.exec_mut(&mut status).unwrap();
|
||||
|
||||
let r = row
|
||||
.values
|
||||
.into_iter()
|
||||
.map(|v| v.value_data.unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let expected = StringValue("Success and Error".into());
|
||||
assert_eq!(expected, r[0]);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user