mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: add simple extract processor (#5688)
* feat: add simple extract processor * chore: add test * chore: add license header * chore: minor update
This commit is contained in:
@@ -24,6 +24,7 @@ pub mod join;
|
||||
pub mod json_path;
|
||||
pub mod letter;
|
||||
pub mod regex;
|
||||
pub mod simple_extract;
|
||||
pub mod timestamp;
|
||||
pub mod urlencoding;
|
||||
|
||||
@@ -51,6 +52,7 @@ use super::error::{
|
||||
use super::field::{Field, Fields};
|
||||
use super::PipelineMap;
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
|
||||
use crate::etl_error::UnsupportedProcessorSnafu;
|
||||
|
||||
const FIELD_NAME: &str = "field";
|
||||
@@ -63,6 +65,7 @@ const SEPARATOR_NAME: &str = "separator";
|
||||
const TARGET_FIELDS_NAME: &str = "target_fields";
|
||||
const JSON_PATH_NAME: &str = "json_path";
|
||||
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
|
||||
const SIMPLE_EXTRACT_KEY_NAME: &str = "key";
|
||||
|
||||
/// Processor trait defines the interface for all processors.
|
||||
///
|
||||
@@ -97,6 +100,7 @@ pub enum ProcessorKind {
|
||||
Epoch(EpochProcessor),
|
||||
Date(DateProcessor),
|
||||
JsonPath(JsonPathProcessor),
|
||||
SimpleJsonPath(SimpleExtractProcessor),
|
||||
Decolorize(DecolorizeProcessor),
|
||||
Digest(DigestProcessor),
|
||||
}
|
||||
@@ -174,6 +178,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
|
||||
ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
|
||||
}
|
||||
digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
|
||||
simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
|
||||
ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
|
||||
}
|
||||
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
|
||||
};
|
||||
|
||||
|
||||
148
src/pipeline/src/etl/processor/simple_extract.rs
Normal file
148
src/pipeline/src/etl/processor/simple_extract.rs
Normal file
@@ -0,0 +1,148 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use snafu::OptionExt as _;
|
||||
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::etl::field::Fields;
|
||||
use crate::etl::processor::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
|
||||
IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME,
|
||||
};
|
||||
use crate::etl_error::{KeyMustBeStringSnafu, ProcessorMissingFieldSnafu};
|
||||
use crate::{PipelineMap, Processor, Value};
|
||||
|
||||
pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct SimpleExtractProcessor {
|
||||
fields: Fields,
|
||||
/// simple keys to extract nested JSON field
|
||||
/// key `a.b` is saved as ['a', 'b'], each key represents a level of the JSON tree
|
||||
key: Vec<String>,
|
||||
ignore_missing: bool,
|
||||
}
|
||||
|
||||
impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
|
||||
let mut fields = Fields::default();
|
||||
let mut ignore_missing = false;
|
||||
let mut keys = vec![];
|
||||
|
||||
for (k, v) in value.iter() {
|
||||
let key = k
|
||||
.as_str()
|
||||
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
|
||||
match key {
|
||||
FIELD_NAME => {
|
||||
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
|
||||
}
|
||||
FIELDS_NAME => {
|
||||
fields = yaml_new_fields(v, FIELDS_NAME)?;
|
||||
}
|
||||
IGNORE_MISSING_NAME => {
|
||||
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
|
||||
}
|
||||
SIMPLE_EXTRACT_KEY_NAME => {
|
||||
let key_str = yaml_string(v, SIMPLE_EXTRACT_KEY_NAME)?;
|
||||
keys.extend(key_str.split(".").map(|s| s.to_string()));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let processor = SimpleExtractProcessor {
|
||||
fields,
|
||||
key: keys,
|
||||
ignore_missing,
|
||||
};
|
||||
|
||||
Ok(processor)
|
||||
}
|
||||
}
|
||||
|
||||
impl SimpleExtractProcessor {
|
||||
fn process_field(&self, val: &Value) -> Result<Value> {
|
||||
let mut current = val;
|
||||
for key in self.key.iter() {
|
||||
let Value::Map(map) = current else {
|
||||
return Ok(Value::Null);
|
||||
};
|
||||
let Some(v) = map.get(key) else {
|
||||
return Ok(Value::Null);
|
||||
};
|
||||
current = v;
|
||||
}
|
||||
Ok(current.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl Processor for SimpleExtractProcessor {
|
||||
fn kind(&self) -> &str {
|
||||
PROCESSOR_SIMPLE_EXTRACT
|
||||
}
|
||||
|
||||
fn ignore_missing(&self) -> bool {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(v) => {
|
||||
let processed = self.process_field(v)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), processed);
|
||||
}
|
||||
None => {
|
||||
if !self.ignore_missing {
|
||||
return ProcessorMissingFieldSnafu {
|
||||
processor: self.kind(),
|
||||
field: field.input_field(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
#[test]
|
||||
fn test_json_path() {
|
||||
use super::*;
|
||||
use crate::{Map, Value};
|
||||
|
||||
let processor = SimpleExtractProcessor {
|
||||
key: vec!["hello".to_string()],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let result = processor
|
||||
.process_field(&Value::Map(Map::one(
|
||||
"hello",
|
||||
Value::String("world".to_string()),
|
||||
)))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result, Value::String("world".to_string()));
|
||||
}
|
||||
}
|
||||
69
src/pipeline/tests/simple_extract.rs
Normal file
69
src/pipeline/tests/simple_extract.rs
Normal file
@@ -0,0 +1,69 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod common;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, SemanticType};
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
lazy_static! {
|
||||
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
|
||||
common::make_column_schema(
|
||||
"commit_author".to_string(),
|
||||
ColumnDataType::String,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gsub() {
|
||||
let input_value_str = r#"
|
||||
[
|
||||
{
|
||||
"commit": {
|
||||
"commitTime": "1573840000.000",
|
||||
"commitAuthor": "test"
|
||||
}
|
||||
}
|
||||
]
|
||||
"#;
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
---
|
||||
processors:
|
||||
- simple_extract:
|
||||
field: commit, commit_author
|
||||
key: "commitAuthor"
|
||||
|
||||
transform:
|
||||
- field: commit_author
|
||||
type: string
|
||||
"#;
|
||||
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
assert_eq!(output.schema, *EXPECTED_SCHEMA);
|
||||
|
||||
assert_eq!(
|
||||
output.rows[0].values[0].value_data,
|
||||
Some(ValueData::StringValue("test".to_string()))
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user