feat: add filter processor to v0.15 (#6516)

feat: add filter processor

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-07-14 17:43:49 +08:00
committed by GitHub
parent 941906dc74
commit 32bffbb668
5 changed files with 327 additions and 0 deletions

View File

@@ -229,6 +229,7 @@ impl DispatchedTo {
pub enum PipelineExecOutput {
Transformed(TransformedOutput),
DispatchedTo(DispatchedTo, Value),
Filtered,
}
#[derive(Debug)]
@@ -309,6 +310,10 @@ impl Pipeline {
// process
for processor in self.processors.iter() {
val = processor.exec_mut(val)?;
if val.is_null() {
// line is filtered
return Ok(PipelineExecOutput::Filtered);
}
}
// dispatch, fast return if matched

View File

@@ -19,6 +19,7 @@ pub mod decolorize;
pub mod digest;
pub mod dissect;
pub mod epoch;
pub mod filter;
pub mod gsub;
pub mod join;
pub mod json_parse;
@@ -54,6 +55,7 @@ use crate::error::{
Result, UnsupportedProcessorSnafu,
};
use crate::etl::field::{Field, Fields};
use crate::etl::processor::filter::FilterProcessor;
use crate::etl::processor::json_parse::JsonParseProcessor;
use crate::etl::processor::select::SelectProcessor;
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
@@ -146,6 +148,7 @@ pub enum ProcessorKind {
Digest(DigestProcessor),
Select(SelectProcessor),
Vrl(VrlProcessor),
Filter(FilterProcessor),
}
#[derive(Debug, Default)]
@@ -226,6 +229,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
}
vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
filter::PROCESSOR_FILTER => ProcessorKind::Filter(FilterProcessor::try_from(value)?),
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};

View File

@@ -0,0 +1,242 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::{HashSet, HashSetExt};
use snafu::OptionExt;
use crate::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
ValueMustBeMapSnafu,
};
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, FIELDS_NAME, FIELD_NAME,
};
use crate::{Processor, Value};
pub(crate) const PROCESSOR_FILTER: &str = "filter";
const MATCH_MODE_NAME: &str = "mode";
const MATCH_OP_NAME: &str = "match_op";
const CASE_INSENSITIVE_NAME: &str = "case_insensitive";
const TARGETS_NAME: &str = "targets";
#[derive(Debug)]
enum MatchMode {
SimpleMatch(MatchOp),
}
impl Default for MatchMode {
fn default() -> Self {
Self::SimpleMatch(MatchOp::default())
}
}
#[derive(Debug, Default)]
enum MatchOp {
#[default]
In,
NotIn,
}
/// Filter out the whole line if matches.
/// Ultimately it's a condition check, maybe we can use VRL to do more complex check.
/// Implement simple string match for now. Can be extended later.
#[derive(Debug, Default)]
pub struct FilterProcessor {
fields: Fields,
mode: MatchMode,
case_insensitive: bool,
targets: HashSet<String>,
}
impl TryFrom<&yaml_rust::yaml::Hash> for FilterProcessor {
type Error = Error;
// match mode can be extended in the future
#[allow(clippy::single_match)]
fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
let mut fields = Fields::default();
let mut mode = MatchMode::default();
let mut op = MatchOp::default();
let mut case_insensitive = true;
let mut targets = HashSet::new();
for (k, v) in value.iter() {
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => fields = Fields::one(yaml_new_field(v, FIELD_NAME)?),
FIELDS_NAME => fields = yaml_new_fields(v, FIELDS_NAME)?,
MATCH_MODE_NAME => match yaml_string(v, MATCH_MODE_NAME)?.as_str() {
"simple" => mode = MatchMode::SimpleMatch(MatchOp::In),
_ => {}
},
MATCH_OP_NAME => match yaml_string(v, MATCH_OP_NAME)?.as_str() {
"in" => op = MatchOp::In,
"not_in" => op = MatchOp::NotIn,
_ => {}
},
CASE_INSENSITIVE_NAME => case_insensitive = yaml_bool(v, CASE_INSENSITIVE_NAME)?,
TARGETS_NAME => {
yaml_strings(v, TARGETS_NAME)?
.into_iter()
.filter(|s| !s.is_empty())
.for_each(|s| {
targets.insert(s);
});
}
_ => {}
}
}
if matches!(mode, MatchMode::SimpleMatch(_)) {
mode = MatchMode::SimpleMatch(op);
}
if targets.is_empty() {
return ProcessorMissingFieldSnafu {
processor: PROCESSOR_FILTER,
field: TARGETS_NAME.to_string(),
}
.fail();
}
if case_insensitive {
targets = targets.into_iter().map(|s| s.to_lowercase()).collect();
}
Ok(FilterProcessor {
fields,
mode,
case_insensitive,
targets,
})
}
}
impl FilterProcessor {
fn match_target(&self, input: String) -> bool {
let input = if self.case_insensitive {
input.to_lowercase()
} else {
input
};
match &self.mode {
MatchMode::SimpleMatch(op) => match op {
MatchOp::In => self.targets.contains(&input),
MatchOp::NotIn => !self.targets.contains(&input),
},
}
}
}
impl Processor for FilterProcessor {
fn kind(&self) -> &str {
PROCESSOR_FILTER
}
fn ignore_missing(&self) -> bool {
true
}
fn exec_mut(&self, mut val: Value) -> Result<Value> {
let v_map = val.as_map_mut().context(ValueMustBeMapSnafu)?;
for field in self.fields.iter() {
let index = field.input_field();
match v_map.get(index) {
Some(Value::String(s)) => {
if self.match_target(s.clone()) {
return Ok(Value::Null);
}
}
Some(v) => {
return ProcessorExpectStringSnafu {
processor: self.kind(),
v: v.clone(),
}
.fail();
}
None => {}
}
}
Ok(val)
}
}
#[cfg(test)]
mod test {
use ahash::HashSet;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::filter::{FilterProcessor, MatchMode, MatchOp};
use crate::{Map, Processor, Value};
#[test]
fn test_eq() {
let processor = FilterProcessor {
fields: Fields::one(Field::new("name", None)),
mode: MatchMode::SimpleMatch(MatchOp::In),
case_insensitive: false,
targets: HashSet::from_iter(vec!["John".to_string()]),
};
let val = Value::Map(Map::one("name", Value::String("John".to_string())));
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, Value::Null);
let val = Value::Map(Map::one("name", Value::String("Wick".to_string())));
let expect = val.clone();
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, expect);
}
#[test]
fn test_ne() {
let processor = FilterProcessor {
fields: Fields::one(Field::new("name", None)),
mode: MatchMode::SimpleMatch(MatchOp::NotIn),
case_insensitive: false,
targets: HashSet::from_iter(vec!["John".to_string()]),
};
let val = Value::Map(Map::one("name", Value::String("John".to_string())));
let expect = val.clone();
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, expect);
let val = Value::Map(Map::one("name", Value::String("Wick".to_string())));
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, Value::Null);
}
#[test]
fn test_case() {
let processor = FilterProcessor {
fields: Fields::one(Field::new("name", None)),
mode: MatchMode::SimpleMatch(MatchOp::In),
case_insensitive: true,
targets: HashSet::from_iter(vec!["john".to_string()]),
};
let val = Value::Map(Map::one("name", Value::String("JoHN".to_string())));
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, Value::Null);
}
}

View File

@@ -167,6 +167,9 @@ async fn run_custom_pipeline(
PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
push_to_map!(dispatched, dispatched_to, val, arr_len);
}
PipelineExecOutput::Filtered => {
continue;
}
}
}