feat(pipeline): filter processor (#6502)

* feat: add filter processor

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* test: add tests

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change target to list and use `in` and `not_in`

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: rebase main and fix error

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-07-14 07:18:42 +08:00
committed by GitHub
parent e5e10fd362
commit 582bcc3b14
5 changed files with 344 additions and 0 deletions

View File

@@ -227,6 +227,7 @@ impl DispatchedTo {
pub enum PipelineExecOutput {
Transformed(TransformedOutput),
DispatchedTo(DispatchedTo, VrlValue),
Filtered,
}
#[derive(Debug)]
@@ -273,6 +274,10 @@ impl Pipeline {
// process
for processor in self.processors.iter() {
val = processor.exec_mut(val)?;
if val.is_null() {
// line is filtered
return Ok(PipelineExecOutput::Filtered);
}
}
// dispatch, fast return if matched

View File

@@ -19,6 +19,7 @@ pub mod decolorize;
pub mod digest;
pub mod dissect;
pub mod epoch;
pub mod filter;
pub mod gsub;
pub mod join;
pub mod json_parse;
@@ -55,6 +56,7 @@ use crate::error::{
Result, UnsupportedProcessorSnafu,
};
use crate::etl::field::{Field, Fields};
use crate::etl::processor::filter::FilterProcessor;
use crate::etl::processor::json_parse::JsonParseProcessor;
use crate::etl::processor::select::SelectProcessor;
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
@@ -146,6 +148,7 @@ pub enum ProcessorKind {
Digest(DigestProcessor),
Select(SelectProcessor),
Vrl(VrlProcessor),
Filter(FilterProcessor),
}
#[derive(Debug, Default)]
@@ -226,6 +229,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
}
vrl_processor::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
filter::PROCESSOR_FILTER => ProcessorKind::Filter(FilterProcessor::try_from(value)?),
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};

View File

@@ -0,0 +1,259 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::{HashSet, HashSetExt};
use snafu::OptionExt;
use vrl::prelude::Value as VrlValue;
use crate::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
ValueMustBeMapSnafu,
};
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, FIELDS_NAME, FIELD_NAME,
};
use crate::Processor;
pub(crate) const PROCESSOR_FILTER: &str = "filter";
const MATCH_MODE_NAME: &str = "mode";
const MATCH_OP_NAME: &str = "match_op";
const CASE_INSENSITIVE_NAME: &str = "case_insensitive";
const TARGETS_NAME: &str = "targets";
#[derive(Debug)]
enum MatchMode {
SimpleMatch(MatchOp),
}
impl Default for MatchMode {
fn default() -> Self {
Self::SimpleMatch(MatchOp::default())
}
}
#[derive(Debug, Default)]
enum MatchOp {
#[default]
In,
NotIn,
}
/// Filter out the whole line if matches.
/// Ultimately it's a condition check, maybe we can use VRL to do more complex check.
/// Implement simple string match for now. Can be extended later.
#[derive(Debug, Default)]
pub struct FilterProcessor {
fields: Fields,
mode: MatchMode,
case_insensitive: bool,
targets: HashSet<String>,
}
impl TryFrom<&yaml_rust::yaml::Hash> for FilterProcessor {
type Error = Error;
// match mode can be extended in the future
#[allow(clippy::single_match)]
fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
let mut fields = Fields::default();
let mut mode = MatchMode::default();
let mut op = MatchOp::default();
let mut case_insensitive = true;
let mut targets = HashSet::new();
for (k, v) in value.iter() {
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => fields = Fields::one(yaml_new_field(v, FIELD_NAME)?),
FIELDS_NAME => fields = yaml_new_fields(v, FIELDS_NAME)?,
MATCH_MODE_NAME => match yaml_string(v, MATCH_MODE_NAME)?.as_str() {
"simple" => mode = MatchMode::SimpleMatch(MatchOp::In),
_ => {}
},
MATCH_OP_NAME => match yaml_string(v, MATCH_OP_NAME)?.as_str() {
"in" => op = MatchOp::In,
"not_in" => op = MatchOp::NotIn,
_ => {}
},
CASE_INSENSITIVE_NAME => case_insensitive = yaml_bool(v, CASE_INSENSITIVE_NAME)?,
TARGETS_NAME => {
yaml_strings(v, TARGETS_NAME)?
.into_iter()
.filter(|s| !s.is_empty())
.for_each(|s| {
targets.insert(s);
});
}
_ => {}
}
}
if matches!(mode, MatchMode::SimpleMatch(_)) {
mode = MatchMode::SimpleMatch(op);
}
if targets.is_empty() {
return ProcessorMissingFieldSnafu {
processor: PROCESSOR_FILTER,
field: TARGETS_NAME.to_string(),
}
.fail();
}
if case_insensitive {
targets = targets.into_iter().map(|s| s.to_lowercase()).collect();
}
Ok(FilterProcessor {
fields,
mode,
case_insensitive,
targets,
})
}
}
impl FilterProcessor {
fn match_target(&self, input: &str) -> bool {
let input = if self.case_insensitive {
&input.to_lowercase()
} else {
input
};
match &self.mode {
MatchMode::SimpleMatch(op) => match op {
MatchOp::In => self.targets.contains(input),
MatchOp::NotIn => !self.targets.contains(input),
},
}
}
}
impl Processor for FilterProcessor {
fn kind(&self) -> &str {
PROCESSOR_FILTER
}
fn ignore_missing(&self) -> bool {
true
}
fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
for field in self.fields.iter() {
let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
let index = field.input_field();
match val.get(index) {
Some(VrlValue::Bytes(b)) => {
if self.match_target(&String::from_utf8_lossy(b)) {
return Ok(VrlValue::Null);
}
}
Some(v) => {
return ProcessorExpectStringSnafu {
processor: self.kind(),
v: v.clone(),
}
.fail();
}
None => {}
}
}
Ok(val)
}
}
#[cfg(test)]
mod test {
use ahash::HashSet;
use vrl::prelude::{Bytes, Value as VrlValue};
use vrl::value::{KeyString, ObjectMap};
use crate::etl::field::{Field, Fields};
use crate::etl::processor::filter::{FilterProcessor, MatchMode, MatchOp};
use crate::Processor;
#[test]
fn test_eq() {
let processor = FilterProcessor {
fields: Fields::one(Field::new("name", None)),
mode: MatchMode::SimpleMatch(MatchOp::In),
case_insensitive: false,
targets: HashSet::from_iter(vec!["John".to_string()]),
};
let val = VrlValue::Object(ObjectMap::from_iter(vec![(
KeyString::from("name"),
VrlValue::Bytes(Bytes::from("John")),
)]));
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, VrlValue::Null);
let val = VrlValue::Object(ObjectMap::from_iter(vec![(
KeyString::from("name"),
VrlValue::Bytes(Bytes::from("Wick")),
)]));
let expect = val.clone();
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, expect);
}
#[test]
fn test_ne() {
let processor = FilterProcessor {
fields: Fields::one(Field::new("name", None)),
mode: MatchMode::SimpleMatch(MatchOp::NotIn),
case_insensitive: false,
targets: HashSet::from_iter(vec!["John".to_string()]),
};
let val = VrlValue::Object(ObjectMap::from_iter(vec![(
KeyString::from("name"),
VrlValue::Bytes(Bytes::from("John")),
)]));
let expect = val.clone();
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, expect);
let val = VrlValue::Object(ObjectMap::from_iter(vec![(
KeyString::from("name"),
VrlValue::Bytes(Bytes::from("Wick")),
)]));
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, VrlValue::Null);
}
#[test]
fn test_case() {
let processor = FilterProcessor {
fields: Fields::one(Field::new("name", None)),
mode: MatchMode::SimpleMatch(MatchOp::In),
case_insensitive: true,
targets: HashSet::from_iter(vec!["john".to_string()]),
};
let val = VrlValue::Object(ObjectMap::from_iter(vec![(
KeyString::from("name"),
VrlValue::Bytes(Bytes::from("JoHN")),
)]));
let result = processor.exec_mut(val).unwrap();
assert_eq!(result, VrlValue::Null);
}
}

View File

@@ -168,6 +168,9 @@ async fn run_custom_pipeline(
PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
push_to_map!(dispatched, dispatched_to, val, arr_len);
}
PipelineExecOutput::Filtered => {
continue;
}
}
}

View File

@@ -112,6 +112,7 @@ macro_rules! http_tests {
test_pipeline_with_hint_vrl,
test_pipeline_2,
test_pipeline_skip_error,
test_pipeline_filter,
test_otlp_metrics,
test_otlp_traces_v0,
@@ -1956,6 +1957,78 @@ transform:
guard.remove_all().await;
}
pub async fn test_pipeline_filter(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_pipeline_filter").await;
// handshake
let client = TestClient::new(app).await;
let pipeline_body = r#"
processors:
- date:
field: time
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
- filter:
field: name
targets:
- John
transform:
- field: name
type: string
- field: time
type: time
index: timestamp
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/test")
.header("Content-Type", "application/x-yaml")
.body(pipeline_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data
let data_body = r#"
[
{
"time": "2024-05-25 20:16:37.217",
"name": "John"
},
{
"time": "2024-05-25 20:16:37.218",
"name": "JoHN"
},
{
"time": "2024-05-25 20:16:37.328",
"name": "Jane"
}
]
"#;
let res = client
.post("/v1/events/logs?db=public&table=logs1&pipeline_name=test")
.header("Content-Type", "application/json")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
validate_data(
"pipeline_filter",
&client,
"select * from logs1",
"[[\"Jane\",1716668197328000000]]",
)
.await;
guard.remove_all().await;
}
pub async fn test_pipeline_dispatcher(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =