diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index e09e5bdc05..eeb5a30e9c 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -24,6 +24,7 @@ pub mod join; pub mod json_path; pub mod letter; pub mod regex; +pub mod simple_extract; pub mod timestamp; pub mod urlencoding; @@ -51,6 +52,7 @@ use super::error::{ use super::field::{Field, Fields}; use super::PipelineMap; use crate::etl::error::{Error, Result}; +use crate::etl::processor::simple_extract::SimpleExtractProcessor; use crate::etl_error::UnsupportedProcessorSnafu; const FIELD_NAME: &str = "field"; @@ -63,6 +65,7 @@ const SEPARATOR_NAME: &str = "separator"; const TARGET_FIELDS_NAME: &str = "target_fields"; const JSON_PATH_NAME: &str = "json_path"; const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index"; +const SIMPLE_EXTRACT_KEY_NAME: &str = "key"; /// Processor trait defines the interface for all processors. /// @@ -97,6 +100,7 @@ pub enum ProcessorKind { Epoch(EpochProcessor), Date(DateProcessor), JsonPath(JsonPathProcessor), + SimpleJsonPath(SimpleExtractProcessor), Decolorize(DecolorizeProcessor), Digest(DigestProcessor), } @@ -174,6 +178,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?) } digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?), + simple_extract::PROCESSOR_SIMPLE_EXTRACT => { + ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?) + } _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(), }; diff --git a/src/pipeline/src/etl/processor/simple_extract.rs b/src/pipeline/src/etl/processor/simple_extract.rs new file mode 100644 index 0000000000..6054120c1b --- /dev/null +++ b/src/pipeline/src/etl/processor/simple_extract.rs @@ -0,0 +1,148 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::OptionExt as _; + +use crate::etl::error::{Error, Result}; +use crate::etl::field::Fields; +use crate::etl::processor::{ + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, + IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME, +}; +use crate::etl_error::{KeyMustBeStringSnafu, ProcessorMissingFieldSnafu}; +use crate::{PipelineMap, Processor, Value}; + +pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract"; + +#[derive(Debug, Default)] +pub struct SimpleExtractProcessor { + fields: Fields, + /// simple keys to extract nested JSON field + /// key `a.b` is saved as ['a', 'b'], each key represents a level of the JSON tree + key: Vec, + ignore_missing: bool, +} + +impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor { + type Error = Error; + + fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result { + let mut fields = Fields::default(); + let mut ignore_missing = false; + let mut keys = vec![]; + + for (k, v) in value.iter() { + let key = k + .as_str() + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + match key { + FIELD_NAME => { + fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); + } + FIELDS_NAME => { + fields = yaml_new_fields(v, FIELDS_NAME)?; + } + IGNORE_MISSING_NAME => { + ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?; + } + SIMPLE_EXTRACT_KEY_NAME => { + let key_str = yaml_string(v, SIMPLE_EXTRACT_KEY_NAME)?; + keys.extend(key_str.split(".").map(|s| s.to_string())); + } + _ => {} + } + } + + let processor = SimpleExtractProcessor { + fields, + key: keys, + ignore_missing, + }; + + Ok(processor) + } +} + +impl SimpleExtractProcessor { + fn process_field(&self, val: &Value) -> Result { + let mut current = val; + for key in self.key.iter() { + let Value::Map(map) = current else { + return Ok(Value::Null); + }; + let Some(v) = map.get(key) else { + return Ok(Value::Null); + }; + current = v; + } + Ok(current.clone()) + } +} + +impl Processor for SimpleExtractProcessor { + fn kind(&self) -> &str { + PROCESSOR_SIMPLE_EXTRACT + } + + fn ignore_missing(&self) -> bool { + self.ignore_missing + } + + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { + for field in self.fields.iter() { + let index = field.input_field(); + match val.get(index) { + Some(v) => { + let processed = self.process_field(v)?; + let output_index = field.target_or_input_field(); + val.insert(output_index.to_string(), processed); + } + None => { + if !self.ignore_missing { + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_field(), + } + .fail(); + } + } + } + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + + #[test] + fn test_json_path() { + use super::*; + use crate::{Map, Value}; + + let processor = SimpleExtractProcessor { + key: vec!["hello".to_string()], + ..Default::default() + }; + + let result = processor + .process_field(&Value::Map(Map::one( + "hello", + Value::String("world".to_string()), + ))) + .unwrap(); + + assert_eq!(result, Value::String("world".to_string())); + } +} diff --git a/src/pipeline/tests/simple_extract.rs b/src/pipeline/tests/simple_extract.rs new file mode 100644 index 0000000000..5e989f0365 --- /dev/null +++ b/src/pipeline/tests/simple_extract.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; + +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use lazy_static::lazy_static; + +lazy_static! { + static ref EXPECTED_SCHEMA: Vec = vec![ + common::make_column_schema( + "commit_author".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; +} + +#[test] +fn test_gsub() { + let input_value_str = r#" + [ + { + "commit": { + "commitTime": "1573840000.000", + "commitAuthor": "test" + } + } + ] +"#; + + let pipeline_yaml = r#" +--- +processors: + - simple_extract: + field: commit, commit_author + key: "commitAuthor" + +transform: + - field: commit_author + type: string +"#; + + let output = common::parse_and_exec(input_value_str, pipeline_yaml); + + assert_eq!(output.schema, *EXPECTED_SCHEMA); + + assert_eq!( + output.rows[0].values[0].value_data, + Some(ValueData::StringValue("test".to_string())) + ); +}