From 32bffbb6687f058298df0f72201344ee21ae136a Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Mon, 14 Jul 2025 17:43:49 +0800 Subject: [PATCH] feat: add filter processor to v0.15 (#6516) feat: add filter processor Signed-off-by: shuiyisong --- src/pipeline/src/etl.rs | 5 + src/pipeline/src/etl/processor.rs | 4 + src/pipeline/src/etl/processor/filter.rs | 242 +++++++++++++++++++++++ src/servers/src/pipeline.rs | 3 + tests-integration/tests/http.rs | 73 +++++++ 5 files changed, 327 insertions(+) create mode 100644 src/pipeline/src/etl/processor/filter.rs diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 40210e5662..861c94736b 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -229,6 +229,7 @@ impl DispatchedTo { pub enum PipelineExecOutput { Transformed(TransformedOutput), DispatchedTo(DispatchedTo, Value), + Filtered, } #[derive(Debug)] @@ -309,6 +310,10 @@ impl Pipeline { // process for processor in self.processors.iter() { val = processor.exec_mut(val)?; + if val.is_null() { + // line is filtered + return Ok(PipelineExecOutput::Filtered); + } } // dispatch, fast return if matched diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index 9d3e1d5c0f..b2d0ecec08 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -19,6 +19,7 @@ pub mod decolorize; pub mod digest; pub mod dissect; pub mod epoch; +pub mod filter; pub mod gsub; pub mod join; pub mod json_parse; @@ -54,6 +55,7 @@ use crate::error::{ Result, UnsupportedProcessorSnafu, }; use crate::etl::field::{Field, Fields}; +use crate::etl::processor::filter::FilterProcessor; use crate::etl::processor::json_parse::JsonParseProcessor; use crate::etl::processor::select::SelectProcessor; use crate::etl::processor::simple_extract::SimpleExtractProcessor; @@ -146,6 +148,7 @@ pub enum ProcessorKind { Digest(DigestProcessor), Select(SelectProcessor), Vrl(VrlProcessor), + Filter(FilterProcessor), } #[derive(Debug, Default)] @@ -226,6 +229,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { } vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?), select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?), + filter::PROCESSOR_FILTER => ProcessorKind::Filter(FilterProcessor::try_from(value)?), _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(), }; diff --git a/src/pipeline/src/etl/processor/filter.rs b/src/pipeline/src/etl/processor/filter.rs new file mode 100644 index 0000000000..423160081e --- /dev/null +++ b/src/pipeline/src/etl/processor/filter.rs @@ -0,0 +1,242 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ahash::{HashSet, HashSetExt}; +use snafu::OptionExt; + +use crate::error::{ + Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, + ValueMustBeMapSnafu, +}; +use crate::etl::field::Fields; +use crate::etl::processor::{ + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, FIELDS_NAME, FIELD_NAME, +}; +use crate::{Processor, Value}; + +pub(crate) const PROCESSOR_FILTER: &str = "filter"; + +const MATCH_MODE_NAME: &str = "mode"; +const MATCH_OP_NAME: &str = "match_op"; +const CASE_INSENSITIVE_NAME: &str = "case_insensitive"; +const TARGETS_NAME: &str = "targets"; + +#[derive(Debug)] +enum MatchMode { + SimpleMatch(MatchOp), +} + +impl Default for MatchMode { + fn default() -> Self { + Self::SimpleMatch(MatchOp::default()) + } +} + +#[derive(Debug, Default)] +enum MatchOp { + #[default] + In, + NotIn, +} + +/// Filter out the whole line if matches. +/// Ultimately it's a condition check, maybe we can use VRL to do more complex check. +/// Implement simple string match for now. Can be extended later. +#[derive(Debug, Default)] +pub struct FilterProcessor { + fields: Fields, + mode: MatchMode, + case_insensitive: bool, + targets: HashSet, +} + +impl TryFrom<&yaml_rust::yaml::Hash> for FilterProcessor { + type Error = Error; + + // match mode can be extended in the future + #[allow(clippy::single_match)] + fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result { + let mut fields = Fields::default(); + let mut mode = MatchMode::default(); + let mut op = MatchOp::default(); + let mut case_insensitive = true; + let mut targets = HashSet::new(); + + for (k, v) in value.iter() { + let key = k + .as_str() + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + match key { + FIELD_NAME => fields = Fields::one(yaml_new_field(v, FIELD_NAME)?), + FIELDS_NAME => fields = yaml_new_fields(v, FIELDS_NAME)?, + MATCH_MODE_NAME => match yaml_string(v, MATCH_MODE_NAME)?.as_str() { + "simple" => mode = MatchMode::SimpleMatch(MatchOp::In), + _ => {} + }, + MATCH_OP_NAME => match yaml_string(v, MATCH_OP_NAME)?.as_str() { + "in" => op = MatchOp::In, + "not_in" => op = MatchOp::NotIn, + _ => {} + }, + CASE_INSENSITIVE_NAME => case_insensitive = yaml_bool(v, CASE_INSENSITIVE_NAME)?, + TARGETS_NAME => { + yaml_strings(v, TARGETS_NAME)? + .into_iter() + .filter(|s| !s.is_empty()) + .for_each(|s| { + targets.insert(s); + }); + } + _ => {} + } + } + + if matches!(mode, MatchMode::SimpleMatch(_)) { + mode = MatchMode::SimpleMatch(op); + } + + if targets.is_empty() { + return ProcessorMissingFieldSnafu { + processor: PROCESSOR_FILTER, + field: TARGETS_NAME.to_string(), + } + .fail(); + } + + if case_insensitive { + targets = targets.into_iter().map(|s| s.to_lowercase()).collect(); + } + + Ok(FilterProcessor { + fields, + mode, + case_insensitive, + targets, + }) + } +} + +impl FilterProcessor { + fn match_target(&self, input: String) -> bool { + let input = if self.case_insensitive { + input.to_lowercase() + } else { + input + }; + + match &self.mode { + MatchMode::SimpleMatch(op) => match op { + MatchOp::In => self.targets.contains(&input), + MatchOp::NotIn => !self.targets.contains(&input), + }, + } + } +} + +impl Processor for FilterProcessor { + fn kind(&self) -> &str { + PROCESSOR_FILTER + } + + fn ignore_missing(&self) -> bool { + true + } + + fn exec_mut(&self, mut val: Value) -> Result { + let v_map = val.as_map_mut().context(ValueMustBeMapSnafu)?; + + for field in self.fields.iter() { + let index = field.input_field(); + match v_map.get(index) { + Some(Value::String(s)) => { + if self.match_target(s.clone()) { + return Ok(Value::Null); + } + } + Some(v) => { + return ProcessorExpectStringSnafu { + processor: self.kind(), + v: v.clone(), + } + .fail(); + } + None => {} + } + } + + Ok(val) + } +} + +#[cfg(test)] +mod test { + use ahash::HashSet; + + use crate::etl::field::{Field, Fields}; + use crate::etl::processor::filter::{FilterProcessor, MatchMode, MatchOp}; + use crate::{Map, Processor, Value}; + + #[test] + fn test_eq() { + let processor = FilterProcessor { + fields: Fields::one(Field::new("name", None)), + mode: MatchMode::SimpleMatch(MatchOp::In), + case_insensitive: false, + targets: HashSet::from_iter(vec!["John".to_string()]), + }; + + let val = Value::Map(Map::one("name", Value::String("John".to_string()))); + + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, Value::Null); + + let val = Value::Map(Map::one("name", Value::String("Wick".to_string()))); + let expect = val.clone(); + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, expect); + } + + #[test] + fn test_ne() { + let processor = FilterProcessor { + fields: Fields::one(Field::new("name", None)), + mode: MatchMode::SimpleMatch(MatchOp::NotIn), + case_insensitive: false, + targets: HashSet::from_iter(vec!["John".to_string()]), + }; + + let val = Value::Map(Map::one("name", Value::String("John".to_string()))); + let expect = val.clone(); + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, expect); + + let val = Value::Map(Map::one("name", Value::String("Wick".to_string()))); + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, Value::Null); + } + + #[test] + fn test_case() { + let processor = FilterProcessor { + fields: Fields::one(Field::new("name", None)), + mode: MatchMode::SimpleMatch(MatchOp::In), + case_insensitive: true, + targets: HashSet::from_iter(vec!["john".to_string()]), + }; + + let val = Value::Map(Map::one("name", Value::String("JoHN".to_string()))); + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, Value::Null); + } +} diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 2ddab66728..9d9217713f 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -167,6 +167,9 @@ async fn run_custom_pipeline( PipelineExecOutput::DispatchedTo(dispatched_to, val) => { push_to_map!(dispatched, dispatched_to, val, arr_len); } + PipelineExecOutput::Filtered => { + continue; + } } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 5c903e7ab1..aabfafa9c1 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -112,6 +112,7 @@ macro_rules! http_tests { test_pipeline_with_hint_vrl, test_pipeline_2, test_pipeline_skip_error, + test_pipeline_filter, test_otlp_metrics, test_otlp_traces_v0, @@ -1945,6 +1946,78 @@ transform: guard.remove_all().await; } +pub async fn test_pipeline_filter(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_pipeline_filter").await; + + // handshake + let client = TestClient::new(app).await; + + let pipeline_body = r#" +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + - filter: + field: name + targets: + - John +transform: + - field: name + type: string + - field: time + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(pipeline_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let data_body = r#" +[ + { + "time": "2024-05-25 20:16:37.217", + "name": "John" + }, + { + "time": "2024-05-25 20:16:37.218", + "name": "JoHN" + }, + { + "time": "2024-05-25 20:16:37.328", + "name": "Jane" + } +] +"#; + + let res = client + .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + validate_data( + "pipeline_filter", + &client, + "select * from logs1", + "[[\"Jane\",1716668197328000000]]", + ) + .await; + + guard.remove_all().await; +} + pub async fn test_pipeline_dispatcher(storage_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =