feat(pipeline): select processor (#6019)

* feat: support auto transform

* refactor: replace hashbrown with ahash

* refactor: params of run identity pipeline

* refactor: minor update

* test: add test for auto transform

* feat: add select processor

* test: select processor

* chore: use include and exclude for key

* fix: typos

* chore: address CR comment

* chore: typo

* chore: typo

* chore: address CR comment

* chore: use with_context
This commit is contained in:
shuiyisong
2025-05-07 11:40:11 +08:00
committed by GitHub
parent df31f0b9ec
commit 56f31d5933
5 changed files with 445 additions and 37 deletions

View File

@@ -35,6 +35,16 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Field renaming must be a string pair of 'key' and 'rename_to', got: {value:?}"
))]
InvalidFieldRename {
value: yaml_rust::Yaml,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Processor must be a map"))]
ProcessorMustBeMap {
#[snafu(implicit)]
@@ -748,6 +758,7 @@ impl ErrorExt for Error {
EmptyInputField { .. }
| MissingInputField { .. }
| InvalidFieldRename { .. }
| ProcessorMustBeMap { .. }
| ProcessorMissingField { .. }
| ProcessorExpectString { .. }

View File

@@ -25,10 +25,13 @@ pub mod json_parse;
pub mod json_path;
pub mod letter;
pub mod regex;
pub mod select;
pub mod simple_extract;
pub mod timestamp;
pub mod urlencoding;
use std::str::FromStr;
use cmcd::CmcdProcessor;
use csv::CsvProcessor;
use date::DateProcessor;
@@ -47,11 +50,13 @@ use timestamp::TimestampProcessor;
use urlencoding::UrlEncodingProcessor;
use crate::error::{
Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu,
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, Result, UnsupportedProcessorSnafu,
Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu,
ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
Result, UnsupportedProcessorSnafu,
};
use crate::etl::field::{Field, Fields};
use crate::etl::processor::json_parse::JsonParseProcessor;
use crate::etl::processor::select::SelectProcessor;
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
use crate::etl::PipelineMap;
@@ -65,7 +70,43 @@ const SEPARATOR_NAME: &str = "separator";
const TARGET_FIELDS_NAME: &str = "target_fields";
const JSON_PATH_NAME: &str = "json_path";
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
const SIMPLE_EXTRACT_KEY_NAME: &str = "key";
const KEY_NAME: &str = "key";
const TYPE_NAME: &str = "type";
const RENAME_TO_KEY: &str = "rename_to";
/// Macro to extract a string value from a YAML map
#[macro_export]
macro_rules! yaml_map_get_str {
($map:expr, $key:expr, $value:expr) => {
$map.get(&yaml_rust::Yaml::String($key.to_string()))
.and_then(|v| v.as_str())
.with_context(|| InvalidFieldRenameSnafu {
value: $value.clone(),
})
};
}
lazy_static::lazy_static! {
static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result<String> = |_, v| {
Ok(v.as_str().unwrap_or_default().into())
};
static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result<Field> = |field, v| {
match v {
yaml_rust::Yaml::String(s) => Field::from_str(s),
yaml_rust::Yaml::Hash(m) => {
let key = yaml_map_get_str!(m, KEY_NAME, v)?;
let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?;
Ok(Field::new(key, Some(rename_to.to_string())))
}
_ => FieldMustBeTypeSnafu {
field,
ty: "string or key-rename_to map",
}
.fail(),
}
};
}
/// Processor trait defines the interface for all processors.
///
@@ -104,6 +145,7 @@ pub enum ProcessorKind {
SimpleJsonPath(SimpleExtractProcessor),
Decolorize(DecolorizeProcessor),
Digest(DigestProcessor),
Select(SelectProcessor),
}
#[derive(Debug, Default)]
@@ -185,6 +227,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
json_parse::PROCESSOR_JSON_PARSE => {
ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
}
select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};
@@ -201,16 +244,19 @@ pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
}
pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
let vec = v
.as_vec()
.context(FieldMustBeTypeSnafu {
field,
ty: "list of string",
})?
yaml_list(v, *STRING_FN, field)
}
pub(crate) fn yaml_list<T>(
v: &yaml_rust::Yaml,
conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result<T>,
field: &str,
) -> Result<Vec<T>> {
v.as_vec()
.context(FieldMustBeTypeSnafu { field, ty: "list" })?
.iter()
.map(|v| v.as_str().unwrap_or_default().into())
.collect();
Ok(vec)
.map(|v| conv_fn(field, v))
.collect()
}
pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
@@ -248,7 +294,7 @@ where
}
pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
yaml_parse_strings(v, field).map(Fields::new)
yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new)
}
pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {

View File

@@ -0,0 +1,185 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::{HashSet, HashSetExt};
use snafu::OptionExt;
use crate::error::{Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result};
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME,
};
use crate::{PipelineMap, Processor};
pub(crate) const PROCESSOR_SELECT: &str = "select";
const INCLUDE_KEY: &str = "include";
const EXCLUDE_KEY: &str = "exclude";
#[derive(Debug, Default)]
pub enum SelectType {
#[default]
Include,
Exclude,
}
impl TryFrom<String> for SelectType {
type Error = Error;
fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
match value.as_str() {
INCLUDE_KEY => Ok(SelectType::Include),
EXCLUDE_KEY => Ok(SelectType::Exclude),
_ => ProcessorUnsupportedValueSnafu {
processor: PROCESSOR_SELECT.to_string(),
val: format!("'{}', expect '{}' or '{}'", value, INCLUDE_KEY, EXCLUDE_KEY),
}
.fail(),
}
}
}
#[derive(Debug, Default)]
pub struct SelectProcessor {
fields: Fields,
select_type: SelectType,
}
impl TryFrom<&yaml_rust::yaml::Hash> for SelectProcessor {
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
let mut fields = Fields::default();
let mut select_type = SelectType::default();
for (k, v) in value.iter() {
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
}
FIELDS_NAME => {
fields = yaml_new_fields(v, FIELDS_NAME)?;
}
TYPE_NAME => {
select_type = SelectType::try_from(yaml_string(v, TYPE_NAME)?)?;
}
_ => {}
}
}
Ok(SelectProcessor {
fields,
select_type,
})
}
}
impl Processor for SelectProcessor {
fn kind(&self) -> &str {
PROCESSOR_SELECT
}
fn ignore_missing(&self) -> bool {
true
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
match self.select_type {
SelectType::Include => {
let mut include_key_set = HashSet::with_capacity(val.len());
for field in self.fields.iter() {
// If the field has a target, move the value to the target
let field_name = field.input_field();
if let Some(target_name) = field.target_field() {
if let Some(v) = val.remove(field_name) {
val.insert(target_name.to_string(), v);
}
include_key_set.insert(target_name);
} else {
include_key_set.insert(field_name);
}
}
val.retain(|k, _| include_key_set.contains(k.as_str()));
}
SelectType::Exclude => {
for field in self.fields.iter() {
val.remove(field.input_field());
}
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
use crate::etl::field::{Field, Fields};
use crate::etl::processor::select::{SelectProcessor, SelectType};
use crate::{PipelineMap, Processor, Value};
#[test]
fn test_select() {
let processor = SelectProcessor {
fields: Fields::one(Field::new("hello", None)),
select_type: SelectType::Include,
};
let mut p = PipelineMap::new();
p.insert("hello".to_string(), Value::String("world".to_string()));
p.insert("hello2".to_string(), Value::String("world2".to_string()));
let result = processor.exec_mut(&mut p);
assert!(result.is_ok());
assert_eq!(p.len(), 1);
assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
}
#[test]
fn test_select_with_target() {
let processor = SelectProcessor {
fields: Fields::one(Field::new("hello", Some("hello3".to_string()))),
select_type: SelectType::Include,
};
let mut p = PipelineMap::new();
p.insert("hello".to_string(), Value::String("world".to_string()));
p.insert("hello2".to_string(), Value::String("world2".to_string()));
let result = processor.exec_mut(&mut p);
assert!(result.is_ok());
assert_eq!(p.len(), 1);
assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
}
#[test]
fn test_select_with_exclude() {
let processor = SelectProcessor {
fields: Fields::one(Field::new("hello", None)),
select_type: SelectType::Exclude,
};
let mut p = PipelineMap::new();
p.insert("hello".to_string(), Value::String("world".to_string()));
p.insert("hello2".to_string(), Value::String("world2".to_string()));
let result = processor.exec_mut(&mut p);
assert!(result.is_ok());
assert_eq!(p.len(), 1);
assert_eq!(p.get("hello"), None);
assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));
}
}

View File

@@ -18,7 +18,7 @@ use crate::error::{Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Resu
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME,
IGNORE_MISSING_NAME, KEY_NAME,
};
use crate::{PipelineMap, Processor, Value};
@@ -55,8 +55,8 @@ impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor {
IGNORE_MISSING_NAME => {
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
}
SIMPLE_EXTRACT_KEY_NAME => {
let key_str = yaml_string(v, SIMPLE_EXTRACT_KEY_NAME)?;
KEY_NAME => {
let key_str = yaml_string(v, KEY_NAME)?;
keys.extend(key_str.split(".").map(|s| s.to_string()));
}
_ => {}

View File

@@ -97,6 +97,7 @@ macro_rules! http_tests {
test_test_pipeline_api,
test_plain_text_ingestion,
test_pipeline_auto_transform,
test_pipeline_auto_transform_with_select,
test_identity_pipeline,
test_identity_pipeline_with_flatten,
test_identity_pipeline_with_custom_ts,
@@ -2377,27 +2378,6 @@ processors:
assert_eq!(res.status(), StatusCode::OK);
let content = res.text().await;
let content = serde_json::from_str(&content);
assert!(content.is_ok());
// {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]}
let content: Value = content.unwrap();
let version_str = content
.get("pipelines")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.get("version")
.unwrap()
.as_str()
.unwrap()
.to_string();
assert!(!version_str.is_empty());
// 2. write data
let data_body = r#"
2024-05-25 20:16:37.217 404 hello
@@ -2424,6 +2404,192 @@ processors:
guard.remove_all().await;
}
pub async fn test_pipeline_auto_transform_with_select(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_pipeline_auto_transform_with_select")
.await;
// handshake
let client = TestClient::new(app).await;
let data_body = r#"
2024-05-25 20:16:37.217 404 hello
2024-05-25 20:16:37.218 200 hello world"#;
// select include
{
let body = r#"
processors:
- dissect:
fields:
- message
patterns:
- "%{+ts} %{+ts} %{http_status_code} %{content}"
- date:
fields:
- ts
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
- select:
fields:
- ts
- http_status_code
"#;
// 1. create pipeline
let res = client
.post("/v1/pipelines/test")
.header("Content-Type", "application/x-yaml")
.body(body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data
let res = client
.post("/v1/ingest?db=public&table=logs1&pipeline_name=test")
.header("Content-Type", "text/plain")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 3. select data
let expected = "[[1716668197217000000,\"404\"],[1716668197218000000,\"200\"]]";
validate_data(
"test_pipeline_auto_transform_with_select",
&client,
"select * from logs1",
expected,
)
.await;
}
// select include rename
{
let body = r#"
processors:
- dissect:
fields:
- message
patterns:
- "%{+ts} %{+ts} %{http_status_code} %{content}"
- date:
fields:
- ts
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
- select:
fields:
- ts
- key: http_status_code
rename_to: s_code
"#;
// 1. create pipeline
let res = client
.post("/v1/pipelines/test2")
.header("Content-Type", "application/x-yaml")
.body(body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data
let res = client
.post("/v1/ingest?db=public&table=logs2&pipeline_name=test2")
.header("Content-Type", "text/plain")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 3. check schema
let expected = "[[\"ts\",\"TimestampNanosecond\",\"PRI\",\"NO\",\"\",\"TIMESTAMP\"],[\"s_code\",\"String\",\"\",\"YES\",\"\",\"FIELD\"]]";
validate_data(
"test_pipeline_auto_transform_with_select_rename",
&client,
"desc table logs2",
expected,
)
.await;
// 4. check data
let expected = "[[1716668197217000000,\"404\"],[1716668197218000000,\"200\"]]";
validate_data(
"test_pipeline_auto_transform_with_select_rename",
&client,
"select * from logs2",
expected,
)
.await;
}
// select exclude
{
let body = r#"
processors:
- dissect:
fields:
- message
patterns:
- "%{+ts} %{+ts} %{http_status_code} %{content}"
- date:
fields:
- ts
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
- select:
type: exclude
fields:
- http_status_code
"#;
// 1. create pipeline
let res = client
.post("/v1/pipelines/test3")
.header("Content-Type", "application/x-yaml")
.body(body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data
let res = client
.post("/v1/ingest?db=public&table=logs3&pipeline_name=test3")
.header("Content-Type", "text/plain")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 3. check schema
let expected = "[[\"ts\",\"TimestampNanosecond\",\"PRI\",\"NO\",\"\",\"TIMESTAMP\"],[\"content\",\"String\",\"\",\"YES\",\"\",\"FIELD\"],[\"message\",\"String\",\"\",\"YES\",\"\",\"FIELD\"]]";
validate_data(
"test_pipeline_auto_transform_with_select_rename",
&client,
"desc table logs3",
expected,
)
.await;
// 4. check data
let expected = "[[1716668197217000000,\"hello\",\"2024-05-25 20:16:37.217 404 hello\"],[1716668197218000000,\"hello world\",\"2024-05-25 20:16:37.218 200 hello world\"]]";
validate_data(
"test_pipeline_auto_transform_with_select_rename",
&client,
"select * from logs3",
expected,
)
.await;
}
guard.remove_all().await;
}
pub async fn test_otlp_metrics(store_type: StorageType) {
// init
common_telemetry::init_default_ut_logging();