diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 4896c3aaea..80ddf18767 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -227,6 +227,7 @@ impl DispatchedTo { pub enum PipelineExecOutput { Transformed(TransformedOutput), DispatchedTo(DispatchedTo, VrlValue), + Filtered, } #[derive(Debug)] @@ -273,6 +274,10 @@ impl Pipeline { // process for processor in self.processors.iter() { val = processor.exec_mut(val)?; + if val.is_null() { + // line is filtered + return Ok(PipelineExecOutput::Filtered); + } } // dispatch, fast return if matched diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index b2f4285257..761bcd6cba 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -19,6 +19,7 @@ pub mod decolorize; pub mod digest; pub mod dissect; pub mod epoch; +pub mod filter; pub mod gsub; pub mod join; pub mod json_parse; @@ -55,6 +56,7 @@ use crate::error::{ Result, UnsupportedProcessorSnafu, }; use crate::etl::field::{Field, Fields}; +use crate::etl::processor::filter::FilterProcessor; use crate::etl::processor::json_parse::JsonParseProcessor; use crate::etl::processor::select::SelectProcessor; use crate::etl::processor::simple_extract::SimpleExtractProcessor; @@ -146,6 +148,7 @@ pub enum ProcessorKind { Digest(DigestProcessor), Select(SelectProcessor), Vrl(VrlProcessor), + Filter(FilterProcessor), } #[derive(Debug, Default)] @@ -226,6 +229,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { } vrl_processor::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?), select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?), + filter::PROCESSOR_FILTER => ProcessorKind::Filter(FilterProcessor::try_from(value)?), _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(), }; diff --git a/src/pipeline/src/etl/processor/filter.rs b/src/pipeline/src/etl/processor/filter.rs new file mode 100644 index 0000000000..0dd559095b --- /dev/null +++ b/src/pipeline/src/etl/processor/filter.rs @@ -0,0 +1,259 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ahash::{HashSet, HashSetExt}; +use snafu::OptionExt; +use vrl::prelude::Value as VrlValue; + +use crate::error::{ + Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, + ValueMustBeMapSnafu, +}; +use crate::etl::field::Fields; +use crate::etl::processor::{ + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, FIELDS_NAME, FIELD_NAME, +}; +use crate::Processor; + +pub(crate) const PROCESSOR_FILTER: &str = "filter"; + +const MATCH_MODE_NAME: &str = "mode"; +const MATCH_OP_NAME: &str = "match_op"; +const CASE_INSENSITIVE_NAME: &str = "case_insensitive"; +const TARGETS_NAME: &str = "targets"; + +#[derive(Debug)] +enum MatchMode { + SimpleMatch(MatchOp), +} + +impl Default for MatchMode { + fn default() -> Self { + Self::SimpleMatch(MatchOp::default()) + } +} + +#[derive(Debug, Default)] +enum MatchOp { + #[default] + In, + NotIn, +} + +/// Filter out the whole line if matches. +/// Ultimately it's a condition check, maybe we can use VRL to do more complex check. +/// Implement simple string match for now. Can be extended later. +#[derive(Debug, Default)] +pub struct FilterProcessor { + fields: Fields, + mode: MatchMode, + case_insensitive: bool, + targets: HashSet, +} + +impl TryFrom<&yaml_rust::yaml::Hash> for FilterProcessor { + type Error = Error; + + // match mode can be extended in the future + #[allow(clippy::single_match)] + fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result { + let mut fields = Fields::default(); + let mut mode = MatchMode::default(); + let mut op = MatchOp::default(); + let mut case_insensitive = true; + let mut targets = HashSet::new(); + + for (k, v) in value.iter() { + let key = k + .as_str() + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + match key { + FIELD_NAME => fields = Fields::one(yaml_new_field(v, FIELD_NAME)?), + FIELDS_NAME => fields = yaml_new_fields(v, FIELDS_NAME)?, + MATCH_MODE_NAME => match yaml_string(v, MATCH_MODE_NAME)?.as_str() { + "simple" => mode = MatchMode::SimpleMatch(MatchOp::In), + _ => {} + }, + MATCH_OP_NAME => match yaml_string(v, MATCH_OP_NAME)?.as_str() { + "in" => op = MatchOp::In, + "not_in" => op = MatchOp::NotIn, + _ => {} + }, + CASE_INSENSITIVE_NAME => case_insensitive = yaml_bool(v, CASE_INSENSITIVE_NAME)?, + TARGETS_NAME => { + yaml_strings(v, TARGETS_NAME)? + .into_iter() + .filter(|s| !s.is_empty()) + .for_each(|s| { + targets.insert(s); + }); + } + _ => {} + } + } + + if matches!(mode, MatchMode::SimpleMatch(_)) { + mode = MatchMode::SimpleMatch(op); + } + + if targets.is_empty() { + return ProcessorMissingFieldSnafu { + processor: PROCESSOR_FILTER, + field: TARGETS_NAME.to_string(), + } + .fail(); + } + + if case_insensitive { + targets = targets.into_iter().map(|s| s.to_lowercase()).collect(); + } + + Ok(FilterProcessor { + fields, + mode, + case_insensitive, + targets, + }) + } +} + +impl FilterProcessor { + fn match_target(&self, input: &str) -> bool { + let input = if self.case_insensitive { + &input.to_lowercase() + } else { + input + }; + + match &self.mode { + MatchMode::SimpleMatch(op) => match op { + MatchOp::In => self.targets.contains(input), + MatchOp::NotIn => !self.targets.contains(input), + }, + } + } +} + +impl Processor for FilterProcessor { + fn kind(&self) -> &str { + PROCESSOR_FILTER + } + + fn ignore_missing(&self) -> bool { + true + } + + fn exec_mut(&self, mut val: VrlValue) -> Result { + for field in self.fields.iter() { + let val = val.as_object_mut().context(ValueMustBeMapSnafu)?; + let index = field.input_field(); + match val.get(index) { + Some(VrlValue::Bytes(b)) => { + if self.match_target(&String::from_utf8_lossy(b)) { + return Ok(VrlValue::Null); + } + } + Some(v) => { + return ProcessorExpectStringSnafu { + processor: self.kind(), + v: v.clone(), + } + .fail(); + } + None => {} + } + } + + Ok(val) + } +} + +#[cfg(test)] +mod test { + use ahash::HashSet; + use vrl::prelude::{Bytes, Value as VrlValue}; + use vrl::value::{KeyString, ObjectMap}; + + use crate::etl::field::{Field, Fields}; + use crate::etl::processor::filter::{FilterProcessor, MatchMode, MatchOp}; + use crate::Processor; + + #[test] + fn test_eq() { + let processor = FilterProcessor { + fields: Fields::one(Field::new("name", None)), + mode: MatchMode::SimpleMatch(MatchOp::In), + case_insensitive: false, + targets: HashSet::from_iter(vec!["John".to_string()]), + }; + + let val = VrlValue::Object(ObjectMap::from_iter(vec![( + KeyString::from("name"), + VrlValue::Bytes(Bytes::from("John")), + )])); + + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, VrlValue::Null); + + let val = VrlValue::Object(ObjectMap::from_iter(vec![( + KeyString::from("name"), + VrlValue::Bytes(Bytes::from("Wick")), + )])); + let expect = val.clone(); + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, expect); + } + + #[test] + fn test_ne() { + let processor = FilterProcessor { + fields: Fields::one(Field::new("name", None)), + mode: MatchMode::SimpleMatch(MatchOp::NotIn), + case_insensitive: false, + targets: HashSet::from_iter(vec!["John".to_string()]), + }; + + let val = VrlValue::Object(ObjectMap::from_iter(vec![( + KeyString::from("name"), + VrlValue::Bytes(Bytes::from("John")), + )])); + let expect = val.clone(); + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, expect); + + let val = VrlValue::Object(ObjectMap::from_iter(vec![( + KeyString::from("name"), + VrlValue::Bytes(Bytes::from("Wick")), + )])); + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, VrlValue::Null); + } + + #[test] + fn test_case() { + let processor = FilterProcessor { + fields: Fields::one(Field::new("name", None)), + mode: MatchMode::SimpleMatch(MatchOp::In), + case_insensitive: true, + targets: HashSet::from_iter(vec!["john".to_string()]), + }; + + let val = VrlValue::Object(ObjectMap::from_iter(vec![( + KeyString::from("name"), + VrlValue::Bytes(Bytes::from("JoHN")), + )])); + let result = processor.exec_mut(val).unwrap(); + assert_eq!(result, VrlValue::Null); + } +} diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index ea504ddf63..111bc7d5a2 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -168,6 +168,9 @@ async fn run_custom_pipeline( PipelineExecOutput::DispatchedTo(dispatched_to, val) => { push_to_map!(dispatched, dispatched_to, val, arr_len); } + PipelineExecOutput::Filtered => { + continue; + } } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f95feee608..4d9cfeac91 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -112,6 +112,7 @@ macro_rules! http_tests { test_pipeline_with_hint_vrl, test_pipeline_2, test_pipeline_skip_error, + test_pipeline_filter, test_otlp_metrics, test_otlp_traces_v0, @@ -1956,6 +1957,78 @@ transform: guard.remove_all().await; } +pub async fn test_pipeline_filter(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_pipeline_filter").await; + + // handshake + let client = TestClient::new(app).await; + + let pipeline_body = r#" +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + - filter: + field: name + targets: + - John +transform: + - field: name + type: string + - field: time + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(pipeline_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let data_body = r#" +[ + { + "time": "2024-05-25 20:16:37.217", + "name": "John" + }, + { + "time": "2024-05-25 20:16:37.218", + "name": "JoHN" + }, + { + "time": "2024-05-25 20:16:37.328", + "name": "Jane" + } +] +"#; + + let res = client + .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + validate_data( + "pipeline_filter", + &client, + "select * from logs1", + "[[\"Jane\",1716668197328000000]]", + ) + .await; + + guard.remove_all().await; +} + pub async fn test_pipeline_dispatcher(storage_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =