diff --git a/src/pipeline/src/etl/transform.rs b/src/pipeline/src/etl/transform.rs index 14cfa440fb..27afeaa7de 100644 --- a/src/pipeline/src/etl/transform.rs +++ b/src/pipeline/src/etl/transform.rs @@ -18,6 +18,7 @@ pub mod transformer; use snafu::OptionExt; use crate::etl::error::{Error, Result}; +use crate::etl::processor::yaml_bool; use crate::etl::transform::index::Index; use crate::etl::value::Value; @@ -25,6 +26,7 @@ const TRANSFORM_FIELD: &str = "field"; const TRANSFORM_FIELDS: &str = "fields"; const TRANSFORM_TYPE: &str = "type"; const TRANSFORM_INDEX: &str = "index"; +const TRANSFORM_TAG: &str = "tag"; const TRANSFORM_DEFAULT: &str = "default"; const TRANSFORM_ON_FAILURE: &str = "on_failure"; @@ -144,6 +146,8 @@ pub struct Transform { pub index: Option, + pub tag: bool, + pub on_failure: Option, } @@ -154,6 +158,7 @@ impl Default for Transform { type_: Value::Null, default: None, index: None, + tag: false, on_failure: None, } } @@ -185,6 +190,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform { let mut type_ = Value::Null; let mut default = None; let mut index = None; + let mut tag = false; let mut on_failure = None; for (k, v) in hash { @@ -210,6 +216,10 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform { index = Some(index_str.try_into()?); } + TRANSFORM_TAG => { + tag = yaml_bool(v, TRANSFORM_TAG)?; + } + TRANSFORM_DEFAULT => { default = Some(Value::try_from(v)?); } @@ -247,6 +257,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform { default: final_default, index, on_failure, + tag, }; Ok(builder) diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 0211e67db1..7b4dab958a 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -96,6 +96,7 @@ impl GreptimeTransformer { default, index: Some(Index::Time), on_failure: Some(crate::etl::transform::OnFailure::Default), + tag: false, }; transforms.push(transform); } diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index e6e315e175..97cefe48ff 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -95,6 +95,10 @@ pub(crate) fn coerce_columns(transform: &Transform) -> Result> } fn coerce_semantic_type(transform: &Transform) -> SemanticType { + if transform.tag { + return SemanticType::Tag; + } + match transform.index { Some(Index::Tag) => SemanticType::Tag, Some(Index::Time) => SemanticType::Timestamp, @@ -478,6 +482,7 @@ mod tests { default: None, index: None, on_failure: None, + tag: false, }; // valid string @@ -503,6 +508,7 @@ mod tests { default: None, index: None, on_failure: Some(OnFailure::Ignore), + tag: false, }; let val = Value::String("hello".to_string()); @@ -518,6 +524,7 @@ mod tests { default: None, index: None, on_failure: Some(OnFailure::Default), + tag: false, }; // with no explicit default value diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c6fd59a631..154034dd67 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1271,9 +1271,11 @@ transform: - field: type type: string index: skipping + tag: true - field: log type: string index: fulltext + tag: true - field: time type: time index: timestamp @@ -1349,7 +1351,7 @@ transform: // 3. check schema - let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\"),\\n PRIMARY KEY (\\\"type\\\", \\\"log\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; validate_data( "pipeline_schema", &client,