feat: digest pipeline processor (#5323)

* feat: basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add document

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply code review comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

* follow the naming master

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-01-16 17:46:37 +08:00
committed by GitHub
parent ccd2b06b7a
commit 86bd54194a
5 changed files with 511 additions and 1 deletions

View File

@@ -363,6 +363,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Wrong digest pattern: {pattern}"))]
DigestPatternInvalid {
pattern: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Url decoding error"))]
UrlEncodingDecode {
#[snafu(source)]

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use snafu::OptionExt;
@@ -218,6 +218,12 @@ impl Deref for Fields {
}
}
impl DerefMut for Fields {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl IntoIterator for Fields {
type Item = Field;
type IntoIter = std::vec::IntoIter<Field>;

View File

@@ -16,6 +16,7 @@ pub mod cmcd;
pub mod csv;
pub mod date;
pub mod decolorize;
pub mod digest;
pub mod dissect;
pub mod epoch;
pub mod gsub;
@@ -31,6 +32,7 @@ use cmcd::{CmcdProcessor, CmcdProcessorBuilder};
use csv::{CsvProcessor, CsvProcessorBuilder};
use date::{DateProcessor, DateProcessorBuilder};
use decolorize::{DecolorizeProcessor, DecolorizeProcessorBuilder};
use digest::{DigestProcessor, DigestProcessorBuilder};
use dissect::{DissectProcessor, DissectProcessorBuilder};
use enum_dispatch::enum_dispatch;
use epoch::{EpochProcessor, EpochProcessorBuilder};
@@ -97,6 +99,7 @@ pub enum ProcessorKind {
Date(DateProcessor),
JsonPath(JsonPathProcessor),
Decolorize(DecolorizeProcessor),
Digest(DigestProcessor),
}
/// ProcessorBuilder trait defines the interface for all processor builders
@@ -127,6 +130,7 @@ pub enum ProcessorBuilders {
Date(DateProcessorBuilder),
JsonPath(JsonPathProcessorBuilder),
Decolorize(DecolorizeProcessorBuilder),
Digest(DigestProcessorBuilder),
}
#[derive(Debug, Default)]
@@ -277,6 +281,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders> {
decolorize::PROCESSOR_DECOLORIZE => {
ProcessorBuilders::Decolorize(DecolorizeProcessorBuilder::try_from(value)?)
}
digest::PROCESSOR_DIGEST => {
ProcessorBuilders::Digest(DigestProcessorBuilder::try_from(value)?)
}
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};

View File

@@ -0,0 +1,428 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Digest the input string by removing certain patterns.
//!
//! This processor can help to extract useful information from a string by removing certain patterns,
//! which is often a variable from the log message. Digested fields are stored in a new field with the
//! `_digest` suffix. And can be used for further processing or analysis like template occurrences count
//! or similarity analysis.
use std::borrow::Cow;
use ahash::HashSet;
use regex::Regex;
use snafu::OptionExt;
use crate::etl::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, ProcessorBuilder, ProcessorKind, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;
use crate::etl_error::DigestPatternInvalidSnafu;
pub(crate) const PROCESSOR_DIGEST: &str = "digest";
const PRESETS_PATTERNS_NAME: &str = "presets";
const REGEX_PATTERNS_NAME: &str = "regex";
enum PresetPattern {
Numbers,
Quoted,
Bracketed,
Uuid,
Ip,
}
impl std::fmt::Display for PresetPattern {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
PresetPattern::Numbers => write!(f, "numbers"),
PresetPattern::Quoted => write!(f, "quoted"),
PresetPattern::Bracketed => write!(f, "bracketed"),
PresetPattern::Uuid => write!(f, "uuid"),
PresetPattern::Ip => write!(f, "ip"),
}
}
}
impl std::str::FromStr for PresetPattern {
type Err = Error;
fn from_str(pattern: &str) -> Result<Self> {
match pattern {
"numbers" => Ok(PresetPattern::Numbers),
"quoted" => Ok(PresetPattern::Quoted),
"bracketed" => Ok(PresetPattern::Bracketed),
"uuid" => Ok(PresetPattern::Uuid),
"ip" => Ok(PresetPattern::Ip),
_ => DigestPatternInvalidSnafu { pattern }.fail(),
}
}
}
impl PresetPattern {
fn regex(&self) -> Regex {
match self {
PresetPattern::Numbers => Regex::new(r"\d+").unwrap(),
PresetPattern::Quoted => Regex::new(r#"["'“”‘’][^"'“”‘’]*["'“”‘’]"#).unwrap(),
PresetPattern::Bracketed => Regex::new(r#"[({\[<「『【〔[{〈《][^(){}\[\]<>「」『』【】〔〕[]{}〈〉《》]*[)}\]>」』】〕]}〉》]"#).unwrap(),
PresetPattern::Uuid => Regex::new(r"\b[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}\b").unwrap(),
PresetPattern::Ip => Regex::new(r"((\d{1,3}\.){3}\d{1,3}(:\d+)?|(\[[0-9a-fA-F:]+\])(:\d+)?)").unwrap(),
}
}
}
#[derive(Debug, Default)]
pub struct DigestProcessorBuilder {
fields: Fields,
patterns: Vec<Regex>,
ignore_missing: bool,
}
impl ProcessorBuilder for DigestProcessorBuilder {
fn output_keys(&self) -> HashSet<&str> {
self.fields
.iter()
.map(|f| f.target_or_input_field())
.collect()
}
fn input_keys(&self) -> HashSet<&str> {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Digest)
}
}
impl DigestProcessorBuilder {
fn build(self, intermediate_keys: &[String]) -> Result<DigestProcessor> {
let mut real_fields = Vec::with_capacity(self.fields.len());
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
"digest",
intermediate_keys,
field.input_field(),
field.target_or_input_field(),
)?;
real_fields.push(input);
}
Ok(DigestProcessor {
fields: real_fields,
ignore_missing: self.ignore_missing,
patterns: self.patterns,
})
}
}
/// Computes a digest (hash) of the input string.
#[derive(Debug, Default)]
pub struct DigestProcessor {
fields: Vec<OneInputOneOutputField>,
ignore_missing: bool,
patterns: Vec<Regex>,
}
impl DigestProcessor {
fn remove_quoted_content(&self, val: &str) -> String {
let re = Regex::new(r#""[^"]*""#).unwrap();
re.replace_all(val, "").to_string()
}
fn process_string(&self, val: &str) -> Result<Value> {
let mut input = Cow::from(val);
for pattern in &self.patterns {
if let Cow::Owned(new_string) = pattern.replace_all(&input, "") {
input = Cow::Owned(new_string);
}
}
Ok(Value::String(input.into_owned()))
}
fn process(&self, val: &Value) -> Result<Value> {
match val {
Value::String(val) => self.process_string(val),
_ => ProcessorExpectStringSnafu {
processor: PROCESSOR_DIGEST,
v: val.clone(),
}
.fail(),
}
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for DigestProcessorBuilder {
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut ignore_missing = false;
let mut patterns = Vec::new();
for (k, v) in value.iter() {
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
}
FIELDS_NAME => {
fields = yaml_new_fields(v, FIELDS_NAME)?;
}
IGNORE_MISSING_NAME => {
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
}
PRESETS_PATTERNS_NAME => {
let preset_patterns: Vec<String> = v
.as_vec()
.with_context(|| DigestPatternInvalidSnafu {
pattern: key.to_string(),
})?
.iter()
.map(|p| p.as_str().unwrap().to_string())
.collect();
for pattern in preset_patterns {
let preset_pattern = pattern.parse::<PresetPattern>()?;
let regex = preset_pattern.regex();
patterns.push(regex);
}
}
REGEX_PATTERNS_NAME => {
let regex_patterns: Vec<String> = v
.as_vec()
.with_context(|| DigestPatternInvalidSnafu {
pattern: key.to_string(),
})?
.iter()
.map(|p| p.as_str().unwrap().to_string())
.collect();
for pattern in regex_patterns {
let regex = Regex::new(&pattern).unwrap();
patterns.push(regex);
}
}
_ => {}
}
}
for field in fields.iter_mut() {
field.target_field = Some(format!("{}_digest", field.input_field()));
}
Ok(DigestProcessorBuilder {
fields,
patterns,
ignore_missing,
})
}
}
impl crate::etl::processor::Processor for DigestProcessor {
fn kind(&self) -> &str {
PROCESSOR_DIGEST
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
Some(Value::Null) | None => {
if !self.ignore_missing {
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {
let result = self.process(v)?;
let output_index = field.output_index();
val[output_index] = result;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_digest_processor_ip() {
let processor = DigestProcessor {
fields: vec![],
ignore_missing: false,
patterns: vec![PresetPattern::Ip.regex()],
};
let input = Value::String("192.168.1.1".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
let input = Value::String("192.168.1.1:8080".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
let input = Value::String("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
let input = Value::String("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:8080".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
let input = Value::String("not an ip".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("not an ip".to_string()));
}
#[test]
fn test_digest_processor_uuid() {
let processor = DigestProcessor {
fields: vec![],
ignore_missing: false,
patterns: vec![PresetPattern::Uuid.regex()],
};
// UUID v4
let input = Value::String("123e4567-e89b-12d3-a456-426614174000".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
// UUID v1
let input = Value::String("6ba7b810-9dad-11d1-80b4-00c04fd430c8".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
// UUID v5
let input = Value::String("886313e1-3b8a-5372-9b90-0c9aee199e5d".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
// UUID with uppercase letters
let input = Value::String("A987FBC9-4BED-3078-CF07-9141BA07C9F3".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
// Negative case
let input = Value::String("not a uuid".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("not a uuid".to_string()));
}
#[test]
fn test_digest_processor_brackets() {
let processor = DigestProcessor {
fields: vec![],
ignore_missing: false,
patterns: vec![PresetPattern::Bracketed.regex()],
};
// Basic brackets
let input = Value::String("[content]".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
let input = Value::String("(content)".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
// Chinese brackets
let input = Value::String("「content」".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
let input = Value::String("『content』".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
let input = Value::String("【content】".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
// Unmatched/unclosed brackets should not match
let input = Value::String("[content".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("[content".to_string()));
let input = Value::String("content]".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("content]".to_string()));
// Bad case
let input = Value::String("[content}".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
// Negative case
let input = Value::String("no brackets".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("no brackets".to_string()));
}
#[test]
fn test_digest_processor_quotes() {
let processor = DigestProcessor {
fields: vec![],
ignore_missing: false,
patterns: vec![PresetPattern::Quoted.regex()],
};
let input = Value::String("\"quoted content\"".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
let input = Value::String("no quotes".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("no quotes".to_string()));
let input = Value::String("".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
}
#[test]
fn test_digest_processor_custom_regex() {
let processor = DigestProcessor {
fields: vec![],
ignore_missing: false,
patterns: vec![Regex::new(r"\d+").unwrap()],
};
let input = Value::String("12345".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
let input = Value::String("no digits".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("no digits".to_string()));
let input = Value::String("".to_string());
let result = processor.process(&input).unwrap();
assert_eq!(result, Value::String("".to_string()));
}
}

View File

@@ -707,3 +707,66 @@ transform:
let expected = StringValue("Success and Error".into());
assert_eq!(expected, r[0]);
}
#[test]
fn test_digest() {
let input_value = serde_json::json!({
"message": "hello world",
"message_with_ip": "hello 192.168.1.1 world",
"message_with_uuid": "hello 123e4567-e89b-12d3-a456-426614174000 world",
"message_with_quote": "hello 'quoted text' world",
"message_bracket": "hello [bracketed text] world",
"message_with_foobar": "hello foobar world"
});
let pipeline_yaml = r#"
processors:
- digest:
fields:
- message
- message_with_ip
- message_with_uuid
- message_with_quote
- message_bracket
- message_with_foobar
presets:
- ip
- uuid
- bracketed
- quoted
regex:
- foobar
transform:
- fields:
- message_with_ip_digest
- message_with_uuid_digest
- message_with_quote_digest
- message_bracket_digest
- message_with_foobar_digest
type: string
"#;
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();
let mut status = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut status).unwrap();
let row = pipeline.exec_mut(&mut status).unwrap();
let mut r = row
.values
.into_iter()
.map(|v| v.value_data.unwrap())
.collect::<Vec<_>>();
r.pop(); // remove the timestamp value
let expected = vec![
StringValue("hello world".into()),
StringValue("hello world".into()),
StringValue("hello world".into()),
StringValue("hello world".into()),
StringValue("hello world".into()),
];
assert_eq!(expected, r);
}