chore: support tag in transform (#5701)

chore: support tag in transform to specify tag
This commit is contained in:
shuiyisong
2025-03-13 15:27:12 +08:00
committed by GitHub
parent e375a18011
commit 4dc1a1d60f
4 changed files with 22 additions and 1 deletions

View File

@@ -18,6 +18,7 @@ pub mod transformer;
use snafu::OptionExt;
use crate::etl::error::{Error, Result};
use crate::etl::processor::yaml_bool;
use crate::etl::transform::index::Index;
use crate::etl::value::Value;
@@ -25,6 +26,7 @@ const TRANSFORM_FIELD: &str = "field";
const TRANSFORM_FIELDS: &str = "fields";
const TRANSFORM_TYPE: &str = "type";
const TRANSFORM_INDEX: &str = "index";
const TRANSFORM_TAG: &str = "tag";
const TRANSFORM_DEFAULT: &str = "default";
const TRANSFORM_ON_FAILURE: &str = "on_failure";
@@ -144,6 +146,8 @@ pub struct Transform {
pub index: Option<Index>,
pub tag: bool,
pub on_failure: Option<OnFailure>,
}
@@ -154,6 +158,7 @@ impl Default for Transform {
type_: Value::Null,
default: None,
index: None,
tag: false,
on_failure: None,
}
}
@@ -185,6 +190,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
let mut type_ = Value::Null;
let mut default = None;
let mut index = None;
let mut tag = false;
let mut on_failure = None;
for (k, v) in hash {
@@ -210,6 +216,10 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
index = Some(index_str.try_into()?);
}
TRANSFORM_TAG => {
tag = yaml_bool(v, TRANSFORM_TAG)?;
}
TRANSFORM_DEFAULT => {
default = Some(Value::try_from(v)?);
}
@@ -247,6 +257,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
default: final_default,
index,
on_failure,
tag,
};
Ok(builder)

View File

@@ -96,6 +96,7 @@ impl GreptimeTransformer {
default,
index: Some(Index::Time),
on_failure: Some(crate::etl::transform::OnFailure::Default),
tag: false,
};
transforms.push(transform);
}

View File

@@ -95,6 +95,10 @@ pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>>
}
fn coerce_semantic_type(transform: &Transform) -> SemanticType {
if transform.tag {
return SemanticType::Tag;
}
match transform.index {
Some(Index::Tag) => SemanticType::Tag,
Some(Index::Time) => SemanticType::Timestamp,
@@ -478,6 +482,7 @@ mod tests {
default: None,
index: None,
on_failure: None,
tag: false,
};
// valid string
@@ -503,6 +508,7 @@ mod tests {
default: None,
index: None,
on_failure: Some(OnFailure::Ignore),
tag: false,
};
let val = Value::String("hello".to_string());
@@ -518,6 +524,7 @@ mod tests {
default: None,
index: None,
on_failure: Some(OnFailure::Default),
tag: false,
};
// with no explicit default value

View File

@@ -1271,9 +1271,11 @@ transform:
- field: type
type: string
index: skipping
tag: true
- field: log
type: string
index: fulltext
tag: true
- field: time
type: time
index: timestamp
@@ -1349,7 +1351,7 @@ transform:
// 3. check schema
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\"),\\n PRIMARY KEY (\\\"type\\\", \\\"log\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"pipeline_schema",
&client,