From 56f31d5933cf5b06d135f70f387c249bf557073b Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Wed, 7 May 2025 11:40:11 +0800 Subject: [PATCH] feat(pipeline): select processor (#6019) * feat: support auto transform * refactor: replace hashbrown with ahash * refactor: params of run identity pipeline * refactor: minor update * test: add test for auto transform * feat: add select processor * test: select processor * chore: use include and exclude for key * fix: typos * chore: address CR comment * chore: typo * chore: typo * chore: address CR comment * chore: use with_context --- src/pipeline/src/error.rs | 11 + src/pipeline/src/etl/processor.rs | 72 ++++-- src/pipeline/src/etl/processor/select.rs | 185 ++++++++++++++++ .../src/etl/processor/simple_extract.rs | 6 +- tests-integration/tests/http.rs | 208 ++++++++++++++++-- 5 files changed, 445 insertions(+), 37 deletions(-) create mode 100644 src/pipeline/src/etl/processor/select.rs diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 72edfa8e03..0172924db5 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -35,6 +35,16 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Field renaming must be a string pair of 'key' and 'rename_to', got: {value:?}" + ))] + InvalidFieldRename { + value: yaml_rust::Yaml, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Processor must be a map"))] ProcessorMustBeMap { #[snafu(implicit)] @@ -748,6 +758,7 @@ impl ErrorExt for Error { EmptyInputField { .. } | MissingInputField { .. } + | InvalidFieldRename { .. } | ProcessorMustBeMap { .. } | ProcessorMissingField { .. } | ProcessorExpectString { .. } diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index aa10fd9e78..0b553c3c9c 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -25,10 +25,13 @@ pub mod json_parse; pub mod json_path; pub mod letter; pub mod regex; +pub mod select; pub mod simple_extract; pub mod timestamp; pub mod urlencoding; +use std::str::FromStr; + use cmcd::CmcdProcessor; use csv::CsvProcessor; use date::DateProcessor; @@ -47,11 +50,13 @@ use timestamp::TimestampProcessor; use urlencoding::UrlEncodingProcessor; use crate::error::{ - Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu, - ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, Result, UnsupportedProcessorSnafu, + Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu, + ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, + Result, UnsupportedProcessorSnafu, }; use crate::etl::field::{Field, Fields}; use crate::etl::processor::json_parse::JsonParseProcessor; +use crate::etl::processor::select::SelectProcessor; use crate::etl::processor::simple_extract::SimpleExtractProcessor; use crate::etl::PipelineMap; @@ -65,7 +70,43 @@ const SEPARATOR_NAME: &str = "separator"; const TARGET_FIELDS_NAME: &str = "target_fields"; const JSON_PATH_NAME: &str = "json_path"; const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index"; -const SIMPLE_EXTRACT_KEY_NAME: &str = "key"; +const KEY_NAME: &str = "key"; +const TYPE_NAME: &str = "type"; +const RENAME_TO_KEY: &str = "rename_to"; + +/// Macro to extract a string value from a YAML map +#[macro_export] +macro_rules! yaml_map_get_str { + ($map:expr, $key:expr, $value:expr) => { + $map.get(&yaml_rust::Yaml::String($key.to_string())) + .and_then(|v| v.as_str()) + .with_context(|| InvalidFieldRenameSnafu { + value: $value.clone(), + }) + }; +} + +lazy_static::lazy_static! { + static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result = |_, v| { + Ok(v.as_str().unwrap_or_default().into()) + }; + + static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result = |field, v| { + match v { + yaml_rust::Yaml::String(s) => Field::from_str(s), + yaml_rust::Yaml::Hash(m) => { + let key = yaml_map_get_str!(m, KEY_NAME, v)?; + let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?; + Ok(Field::new(key, Some(rename_to.to_string()))) + } + _ => FieldMustBeTypeSnafu { + field, + ty: "string or key-rename_to map", + } + .fail(), + } + }; +} /// Processor trait defines the interface for all processors. /// @@ -104,6 +145,7 @@ pub enum ProcessorKind { SimpleJsonPath(SimpleExtractProcessor), Decolorize(DecolorizeProcessor), Digest(DigestProcessor), + Select(SelectProcessor), } #[derive(Debug, Default)] @@ -185,6 +227,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { json_parse::PROCESSOR_JSON_PARSE => { ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?) } + select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?), _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(), }; @@ -201,16 +244,19 @@ pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result { } pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result> { - let vec = v - .as_vec() - .context(FieldMustBeTypeSnafu { - field, - ty: "list of string", - })? + yaml_list(v, *STRING_FN, field) +} + +pub(crate) fn yaml_list( + v: &yaml_rust::Yaml, + conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result, + field: &str, +) -> Result> { + v.as_vec() + .context(FieldMustBeTypeSnafu { field, ty: "list" })? .iter() - .map(|v| v.as_str().unwrap_or_default().into()) - .collect(); - Ok(vec) + .map(|v| conv_fn(field, v)) + .collect() } pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result { @@ -248,7 +294,7 @@ where } pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result { - yaml_parse_strings(v, field).map(Fields::new) + yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new) } pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result { diff --git a/src/pipeline/src/etl/processor/select.rs b/src/pipeline/src/etl/processor/select.rs new file mode 100644 index 0000000000..cbaeae0e9a --- /dev/null +++ b/src/pipeline/src/etl/processor/select.rs @@ -0,0 +1,185 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ahash::{HashSet, HashSetExt}; +use snafu::OptionExt; + +use crate::error::{Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result}; +use crate::etl::field::Fields; +use crate::etl::processor::{ + yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME, +}; +use crate::{PipelineMap, Processor}; + +pub(crate) const PROCESSOR_SELECT: &str = "select"; +const INCLUDE_KEY: &str = "include"; +const EXCLUDE_KEY: &str = "exclude"; + +#[derive(Debug, Default)] +pub enum SelectType { + #[default] + Include, + Exclude, +} + +impl TryFrom for SelectType { + type Error = Error; + + fn try_from(value: String) -> std::result::Result { + match value.as_str() { + INCLUDE_KEY => Ok(SelectType::Include), + EXCLUDE_KEY => Ok(SelectType::Exclude), + _ => ProcessorUnsupportedValueSnafu { + processor: PROCESSOR_SELECT.to_string(), + val: format!("'{}', expect '{}' or '{}'", value, INCLUDE_KEY, EXCLUDE_KEY), + } + .fail(), + } + } +} + +#[derive(Debug, Default)] +pub struct SelectProcessor { + fields: Fields, + select_type: SelectType, +} + +impl TryFrom<&yaml_rust::yaml::Hash> for SelectProcessor { + type Error = Error; + + fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result { + let mut fields = Fields::default(); + let mut select_type = SelectType::default(); + + for (k, v) in value.iter() { + let key = k + .as_str() + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + match key { + FIELD_NAME => { + fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); + } + FIELDS_NAME => { + fields = yaml_new_fields(v, FIELDS_NAME)?; + } + TYPE_NAME => { + select_type = SelectType::try_from(yaml_string(v, TYPE_NAME)?)?; + } + _ => {} + } + } + + Ok(SelectProcessor { + fields, + select_type, + }) + } +} + +impl Processor for SelectProcessor { + fn kind(&self) -> &str { + PROCESSOR_SELECT + } + + fn ignore_missing(&self) -> bool { + true + } + + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + match self.select_type { + SelectType::Include => { + let mut include_key_set = HashSet::with_capacity(val.len()); + for field in self.fields.iter() { + // If the field has a target, move the value to the target + let field_name = field.input_field(); + if let Some(target_name) = field.target_field() { + if let Some(v) = val.remove(field_name) { + val.insert(target_name.to_string(), v); + } + include_key_set.insert(target_name); + } else { + include_key_set.insert(field_name); + } + } + val.retain(|k, _| include_key_set.contains(k.as_str())); + } + SelectType::Exclude => { + for field in self.fields.iter() { + val.remove(field.input_field()); + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use crate::etl::field::{Field, Fields}; + use crate::etl::processor::select::{SelectProcessor, SelectType}; + use crate::{PipelineMap, Processor, Value}; + + #[test] + fn test_select() { + let processor = SelectProcessor { + fields: Fields::one(Field::new("hello", None)), + select_type: SelectType::Include, + }; + + let mut p = PipelineMap::new(); + p.insert("hello".to_string(), Value::String("world".to_string())); + p.insert("hello2".to_string(), Value::String("world2".to_string())); + + let result = processor.exec_mut(&mut p); + assert!(result.is_ok()); + assert_eq!(p.len(), 1); + assert_eq!(p.get("hello"), Some(&Value::String("world".to_string()))); + } + + #[test] + fn test_select_with_target() { + let processor = SelectProcessor { + fields: Fields::one(Field::new("hello", Some("hello3".to_string()))), + select_type: SelectType::Include, + }; + + let mut p = PipelineMap::new(); + p.insert("hello".to_string(), Value::String("world".to_string())); + p.insert("hello2".to_string(), Value::String("world2".to_string())); + + let result = processor.exec_mut(&mut p); + assert!(result.is_ok()); + assert_eq!(p.len(), 1); + assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string()))); + } + + #[test] + fn test_select_with_exclude() { + let processor = SelectProcessor { + fields: Fields::one(Field::new("hello", None)), + select_type: SelectType::Exclude, + }; + + let mut p = PipelineMap::new(); + p.insert("hello".to_string(), Value::String("world".to_string())); + p.insert("hello2".to_string(), Value::String("world2".to_string())); + + let result = processor.exec_mut(&mut p); + assert!(result.is_ok()); + assert_eq!(p.len(), 1); + assert_eq!(p.get("hello"), None); + assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string()))); + } +} diff --git a/src/pipeline/src/etl/processor/simple_extract.rs b/src/pipeline/src/etl/processor/simple_extract.rs index 6015fc4591..a8c3b48cdd 100644 --- a/src/pipeline/src/etl/processor/simple_extract.rs +++ b/src/pipeline/src/etl/processor/simple_extract.rs @@ -18,7 +18,7 @@ use crate::error::{Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Resu use crate::etl::field::Fields; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, - IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME, + IGNORE_MISSING_NAME, KEY_NAME, }; use crate::{PipelineMap, Processor, Value}; @@ -55,8 +55,8 @@ impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor { IGNORE_MISSING_NAME => { ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?; } - SIMPLE_EXTRACT_KEY_NAME => { - let key_str = yaml_string(v, SIMPLE_EXTRACT_KEY_NAME)?; + KEY_NAME => { + let key_str = yaml_string(v, KEY_NAME)?; keys.extend(key_str.split(".").map(|s| s.to_string())); } _ => {} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8e5223fad5..a3470eef76 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -97,6 +97,7 @@ macro_rules! http_tests { test_test_pipeline_api, test_plain_text_ingestion, test_pipeline_auto_transform, + test_pipeline_auto_transform_with_select, test_identity_pipeline, test_identity_pipeline_with_flatten, test_identity_pipeline_with_custom_ts, @@ -2377,27 +2378,6 @@ processors: assert_eq!(res.status(), StatusCode::OK); - let content = res.text().await; - - let content = serde_json::from_str(&content); - assert!(content.is_ok()); - // {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]} - let content: Value = content.unwrap(); - - let version_str = content - .get("pipelines") - .unwrap() - .as_array() - .unwrap() - .first() - .unwrap() - .get("version") - .unwrap() - .as_str() - .unwrap() - .to_string(); - assert!(!version_str.is_empty()); - // 2. write data let data_body = r#" 2024-05-25 20:16:37.217 404 hello @@ -2424,6 +2404,192 @@ processors: guard.remove_all().await; } +pub async fn test_pipeline_auto_transform_with_select(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_pipeline_auto_transform_with_select") + .await; + + // handshake + let client = TestClient::new(app).await; + let data_body = r#" +2024-05-25 20:16:37.217 404 hello +2024-05-25 20:16:37.218 200 hello world"#; + + // select include + { + let body = r#" + processors: + - dissect: + fields: + - message + patterns: + - "%{+ts} %{+ts} %{http_status_code} %{content}" + - date: + fields: + - ts + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + - select: + fields: + - ts + - http_status_code + "#; + + // 1. create pipeline + let res = client + .post("/v1/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let res = client + .post("/v1/ingest?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "text/plain") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. select data + let expected = "[[1716668197217000000,\"404\"],[1716668197218000000,\"200\"]]"; + validate_data( + "test_pipeline_auto_transform_with_select", + &client, + "select * from logs1", + expected, + ) + .await; + } + + // select include rename + { + let body = r#" + processors: + - dissect: + fields: + - message + patterns: + - "%{+ts} %{+ts} %{http_status_code} %{content}" + - date: + fields: + - ts + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + - select: + fields: + - ts + - key: http_status_code + rename_to: s_code + "#; + + // 1. create pipeline + let res = client + .post("/v1/pipelines/test2") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let res = client + .post("/v1/ingest?db=public&table=logs2&pipeline_name=test2") + .header("Content-Type", "text/plain") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. check schema + let expected = "[[\"ts\",\"TimestampNanosecond\",\"PRI\",\"NO\",\"\",\"TIMESTAMP\"],[\"s_code\",\"String\",\"\",\"YES\",\"\",\"FIELD\"]]"; + validate_data( + "test_pipeline_auto_transform_with_select_rename", + &client, + "desc table logs2", + expected, + ) + .await; + + // 4. check data + let expected = "[[1716668197217000000,\"404\"],[1716668197218000000,\"200\"]]"; + validate_data( + "test_pipeline_auto_transform_with_select_rename", + &client, + "select * from logs2", + expected, + ) + .await; + } + + // select exclude + { + let body = r#" + processors: + - dissect: + fields: + - message + patterns: + - "%{+ts} %{+ts} %{http_status_code} %{content}" + - date: + fields: + - ts + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + - select: + type: exclude + fields: + - http_status_code + "#; + + // 1. create pipeline + let res = client + .post("/v1/pipelines/test3") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let res = client + .post("/v1/ingest?db=public&table=logs3&pipeline_name=test3") + .header("Content-Type", "text/plain") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. check schema + let expected = "[[\"ts\",\"TimestampNanosecond\",\"PRI\",\"NO\",\"\",\"TIMESTAMP\"],[\"content\",\"String\",\"\",\"YES\",\"\",\"FIELD\"],[\"message\",\"String\",\"\",\"YES\",\"\",\"FIELD\"]]"; + validate_data( + "test_pipeline_auto_transform_with_select_rename", + &client, + "desc table logs3", + expected, + ) + .await; + + // 4. check data + let expected = "[[1716668197217000000,\"hello\",\"2024-05-25 20:16:37.217 404 hello\"],[1716668197218000000,\"hello world\",\"2024-05-25 20:16:37.218 200 hello world\"]]"; + validate_data( + "test_pipeline_auto_transform_with_select_rename", + &client, + "select * from logs3", + expected, + ) + .await; + } + + guard.remove_all().await; +} + pub async fn test_otlp_metrics(store_type: StorageType) { // init common_telemetry::init_default_ut_logging();