From 3b701d8f5e97cd4a4e1a7a44d19633c211fce740 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Sun, 4 Aug 2024 16:29:31 +0800 Subject: [PATCH] test: more on processors (#4493) * test: add date test * test: add epoch test * test: add letter test and complete some others * test: add urlencoding test * chore: typo --- src/pipeline/tests/date.rs | 138 ++++++++++++++++ src/pipeline/tests/dissect.rs | 150 +++++++++++++++++- src/pipeline/tests/epoch.rs | 255 ++++++++++++++++++++++++++++++ src/pipeline/tests/gsub.rs | 34 ++++ src/pipeline/tests/join.rs | 38 +++++ src/pipeline/tests/letter.rs | 188 ++++++++++++++++++++++ src/pipeline/tests/regex.rs | 71 +++++---- src/pipeline/tests/urlencoding.rs | 112 +++++++++++++ 8 files changed, 954 insertions(+), 32 deletions(-) create mode 100644 src/pipeline/tests/date.rs create mode 100644 src/pipeline/tests/epoch.rs create mode 100644 src/pipeline/tests/letter.rs create mode 100644 src/pipeline/tests/urlencoding.rs diff --git a/src/pipeline/tests/date.rs b/src/pipeline/tests/date.rs new file mode 100644 index 0000000000..775f0688c1 --- /dev/null +++ b/src/pipeline/tests/date.rs @@ -0,0 +1,138 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; + +use api::v1::ColumnSchema; +use greptime_proto::v1::value::ValueData; +use greptime_proto::v1::{ColumnDataType, SemanticType}; +use lazy_static::lazy_static; + +const TEST_INPUT: &str = r#" +{ + "input_str": "2024-06-27T06:13:36.991Z" +}"#; + +const TEST_VALUE: Option = + Some(ValueData::TimestampNanosecondValue(1719468816991000000)); + +lazy_static! { + static ref EXPECTED_SCHEMA: Vec = vec![ + common::make_column_schema( + "ts".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; +} + +#[test] +fn test_parse_date() { + let pipeline_yaml = r#" +processors: + - date: + fields: + - input_str + formats: + - "%Y-%m-%dT%H:%M:%S%.3fZ" + +transform: + - fields: + - input_str, ts + type: time +"#; + + let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE); +} + +#[test] +fn test_multi_formats() { + let pipeline_yaml = r#" +processors: + - date: + fields: + - input_str + formats: + - "%Y-%m-%dT%H:%M:%S" + - "%Y-%m-%dT%H:%M:%S%.3fZ" + +transform: + - fields: + - input_str, ts + type: time +"#; + + let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE); +} + +#[test] +fn test_ignore_missing() { + let empty_input = r#"{}"#; + + let pipeline_yaml = r#" +processors: + - date: + fields: + - input_str + formats: + - "%Y-%m-%dT%H:%M:%S" + - "%Y-%m-%dT%H:%M:%S%.3fZ" + ignore_missing: true + +transform: + - fields: + - input_str, ts + type: time +"#; + + let output = common::parse_and_exec(empty_input, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!(output.rows[0].values[0].value_data, None); +} + +#[test] +fn test_timezone() { + let pipeline_yaml = r#" +processors: + - date: + fields: + - input_str + formats: + - "%Y-%m-%dT%H:%M:%S" + - "%Y-%m-%dT%H:%M:%S%.3fZ" + ignore_missing: true + timezone: 'Asia/Shanghai' + +transform: + - fields: + - input_str, ts + type: time +"#; + + let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!( + output.rows[0].values[0].value_data, + Some(ValueData::TimestampNanosecondValue(1719440016991000000)) + ); +} diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs index bc9ca263ca..10f9e27996 100644 --- a/src/pipeline/tests/dissect.rs +++ b/src/pipeline/tests/dissect.rs @@ -17,6 +17,10 @@ mod common; use greptime_proto::v1::value::ValueData::StringValue; use greptime_proto::v1::{ColumnDataType, SemanticType}; +fn make_string_column_schema(name: String) -> greptime_proto::v1::ColumnSchema { + common::make_column_schema(name, ColumnDataType::String, SemanticType::Field) +} + #[test] fn test_dissect_pattern() { let input_value_str = r#" @@ -43,8 +47,8 @@ transform: let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ - common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field), - common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field), + make_string_column_schema("a".to_string()), + make_string_column_schema("b".to_string()), common::make_column_schema( "greptime_timestamp".to_string(), ColumnDataType::TimestampNanosecond, @@ -91,8 +95,8 @@ transform: let output = common::parse_and_exec(input_value_str, pipeline_yaml); let expected_schema = vec![ - common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field), - common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field), + make_string_column_schema("a".to_string()), + make_string_column_schema("b".to_string()), common::make_column_schema( "greptime_timestamp".to_string(), ColumnDataType::TimestampNanosecond, @@ -111,3 +115,141 @@ transform: Some(StringValue("456".to_string())) ); } + +#[test] +fn test_ignore_missing() { + let empty_str = r#"{}"#; + + let pipeline_yaml = r#" +processors: + - dissect: + field: str + patterns: + - "%{a} %{b}" + ignore_missing: true + +transform: + - fields: + - a + - b + type: string +"#; + + let output = common::parse_and_exec(empty_str, pipeline_yaml); + + let expected_schema = vec![ + make_string_column_schema("a".to_string()), + make_string_column_schema("b".to_string()), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + assert_eq!(output.schema, expected_schema); + + assert_eq!(output.rows[0].values[0].value_data, None); + assert_eq!(output.rows[0].values[1].value_data, None); +} + +#[test] +fn test_modifier() { + let empty_str = r#" +{ + "str": "key1 key2 key3 key4 key5 key6 key7 key8" +}"#; + + let pipeline_yaml = r#" +processors: + - dissect: + field: str + patterns: + - "%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6} %{*key_7} %{&key_7}" + +transform: + - fields: + - key1 + - key2 + - key3 + - key5 + - key7 + type: string +"#; + + let output = common::parse_and_exec(empty_str, pipeline_yaml); + + let expected_schema = vec![ + make_string_column_schema("key1".to_string()), + make_string_column_schema("key2".to_string()), + make_string_column_schema("key3".to_string()), + make_string_column_schema("key5".to_string()), + make_string_column_schema("key7".to_string()), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + assert_eq!(output.schema, expected_schema); + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("key1".to_string())) + ); + assert_eq!( + output.rows[0].values[1].value_data, + Some(StringValue("key2".to_string())) + ); + assert_eq!( + output.rows[0].values[2].value_data, + Some(StringValue("key3 key4".to_string())) + ); + assert_eq!( + output.rows[0].values[3].value_data, + Some(StringValue("key5".to_string())) + ); + assert_eq!( + output.rows[0].values[4].value_data, + Some(StringValue("key8".to_string())) + ); +} + +#[test] +fn test_append_separator() { + let empty_str = r#" +{ + "str": "key1 key2" +}"#; + + let pipeline_yaml = r#" +processors: + - dissect: + field: str + patterns: + - "%{+key1} %{+key1}" + append_separator: "_" + +transform: + - fields: + - key1 + type: string +"#; + + let output = common::parse_and_exec(empty_str, pipeline_yaml); + + let expected_schema = vec![ + make_string_column_schema("key1".to_string()), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + assert_eq!(output.schema, expected_schema); + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("key1_key2".to_string())) + ); +} diff --git a/src/pipeline/tests/epoch.rs b/src/pipeline/tests/epoch.rs new file mode 100644 index 0000000000..35a2ab635c --- /dev/null +++ b/src/pipeline/tests/epoch.rs @@ -0,0 +1,255 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; + +use api::v1::ColumnSchema; +use greptime_proto::v1::value::ValueData; +use greptime_proto::v1::{ColumnDataType, SemanticType}; + +#[test] +fn test_parse_epoch() { + let test_input = r#" + { + "input_s": "1722580862", + "input_sec": "1722580862", + "input_second": "1722580862", + "input_ms": "1722580887794", + "input_millisecond": "1722580887794", + "input_milli": "1722580887794", + "input_default": "1722580887794", + "input_us": "1722580905423969", + "input_microsecond": "1722580905423969", + "input_micro": "1722580905423969", + "input_ns": "1722580929863842048", + "input_nanosecond": "1722580929863842048", + "input_nano": "1722580929863842048" + }"#; + + let pipeline_yaml = r#" +processors: + - epoch: + field: input_s + resolution: s + - epoch: + field: input_sec + resolution: sec + - epoch: + field: input_second + resolution: second + - epoch: + field: input_ms + resolution: ms + - epoch: + field: input_millisecond + resolution: millisecond + - epoch: + field: input_milli + resolution: milli + - epoch: + field: input_default + - epoch: + field: input_us + resolution: us + - epoch: + field: input_microsecond + resolution: microsecond + - epoch: + field: input_micro + resolution: micro + - epoch: + field: input_ns + resolution: ns + - epoch: + field: input_nanosecond + resolution: nanosecond + - epoch: + field: input_nano + resolution: nano + +transform: + - field: input_s + type: epoch, s + - field: input_sec + type: epoch, sec + - field: input_second + type: epoch, second + + - field: input_ms + type: epoch, ms + - field: input_millisecond + type: epoch, millisecond + - field: input_milli + type: epoch, milli + - field: input_default + type: epoch, milli + + - field: input_us + type: epoch, us + - field: input_microsecond + type: epoch, microsecond + - field: input_micro + type: epoch, micro + + - field: input_ns + type: epoch, ns + - field: input_nanosecond + type: epoch, nanosecond + - field: input_nano + type: epoch, nano +"#; + fn make_time_field(name: &str, datatype: ColumnDataType) -> ColumnSchema { + common::make_column_schema(name.to_string(), datatype, SemanticType::Field) + } + + let expected_schema = vec![ + make_time_field("input_s", ColumnDataType::TimestampSecond), + make_time_field("input_sec", ColumnDataType::TimestampSecond), + make_time_field("input_second", ColumnDataType::TimestampSecond), + make_time_field("input_ms", ColumnDataType::TimestampMillisecond), + make_time_field("input_millisecond", ColumnDataType::TimestampMillisecond), + make_time_field("input_milli", ColumnDataType::TimestampMillisecond), + make_time_field("input_default", ColumnDataType::TimestampMillisecond), + make_time_field("input_us", ColumnDataType::TimestampMicrosecond), + make_time_field("input_microsecond", ColumnDataType::TimestampMicrosecond), + make_time_field("input_micro", ColumnDataType::TimestampMicrosecond), + make_time_field("input_ns", ColumnDataType::TimestampNanosecond), + make_time_field("input_nanosecond", ColumnDataType::TimestampNanosecond), + make_time_field("input_nano", ColumnDataType::TimestampNanosecond), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + let output = common::parse_and_exec(test_input, pipeline_yaml); + assert_eq!(output.schema, expected_schema); + + for i in 0..2 { + assert_eq!( + output.rows[0].values[i].value_data, + Some(ValueData::TimestampSecondValue(1722580862)) + ); + } + for i in 3..6 { + assert_eq!( + output.rows[0].values[i].value_data, + Some(ValueData::TimestampMillisecondValue(1722580887794)) + ); + } + for i in 7..9 { + assert_eq!( + output.rows[0].values[i].value_data, + Some(ValueData::TimestampMicrosecondValue(1722580905423969)) + ); + } + for i in 10..12 { + assert_eq!( + output.rows[0].values[i].value_data, + Some(ValueData::TimestampNanosecondValue(1722580929863842048)) + ); + } +} + +#[test] +fn test_ignore_missing() { + let empty_input = r#"{}"#; + + let pipeline_yaml = r#" +processors: + - epoch: + field: input_s + resolution: s + ignore_missing: true + +transform: + - fields: + - input_s, ts + type: epoch, s +"#; + + let expected_schema = vec![ + common::make_column_schema( + "ts".to_string(), + ColumnDataType::TimestampSecond, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + let output = common::parse_and_exec(empty_input, pipeline_yaml); + assert_eq!(output.schema, expected_schema); + assert_eq!(output.rows[0].values[0].value_data, None); +} + +#[test] +fn test_default_wrong_resolution() { + let test_input = r#" + { + "input_s": "1722580862", + "input_nano": "1722583122284583936" + }"#; + + let pipeline_yaml = r#" +processors: + - epoch: + fields: + - input_s + - input_nano + +transform: + - fields: + - input_s + type: epoch, s + - fields: + - input_nano + type: epoch, nano +"#; + + let expected_schema = vec![ + common::make_column_schema( + "input_s".to_string(), + ColumnDataType::TimestampSecond, + SemanticType::Field, + ), + common::make_column_schema( + "input_nano".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + let output = common::parse_and_exec(test_input, pipeline_yaml); + assert_eq!(output.schema, expected_schema); + // this is actually wrong + // TODO(shuiyisong): add check for type when converting epoch + assert_eq!( + output.rows[0].values[0].value_data, + Some(ValueData::TimestampMillisecondValue(1722580862)) + ); + assert_eq!( + output.rows[0].values[1].value_data, + Some(ValueData::TimestampMillisecondValue(1722583122284583936)) + ); +} diff --git a/src/pipeline/tests/gsub.rs b/src/pipeline/tests/gsub.rs index 2f336923e8..b7044b9a18 100644 --- a/src/pipeline/tests/gsub.rs +++ b/src/pipeline/tests/gsub.rs @@ -61,3 +61,37 @@ transform: Some(TimestampMillisecondValue(1573840000000)) ); } + +#[test] +fn test_ignore_missing() { + let empty_string = r#"{}"#; + + let pipeline_yaml = r#" +processors: + - gsub: + field: reqTimeSec + pattern: "\\." + replacement: "" + ignore_missing: true + - epoch: + field: reqTimeSec + resolution: millisecond + ignore_missing: true + +transform: + - field: reqTimeSec + type: epoch, millisecond + index: timestamp +"#; + + let output = common::parse_and_exec(empty_string, pipeline_yaml); + + let expected_schema = vec![common::make_column_schema( + "reqTimeSec".to_string(), + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + )]; + + assert_eq!(output.schema, expected_schema); + assert_eq!(output.rows[0].values[0].value_data, None); +} diff --git a/src/pipeline/tests/join.rs b/src/pipeline/tests/join.rs index 9ffa35909c..3625160361 100644 --- a/src/pipeline/tests/join.rs +++ b/src/pipeline/tests/join.rs @@ -117,3 +117,41 @@ fn test_float() { Some(StringValue("1.1-1.2-1.3".to_string())) ); } + +#[test] +fn test_mix_type() { + let input_value_str = r#" + [ + { + "join_test": [1, true, "a", 1.1] + } + ] +"#; + let output = common::parse_and_exec(input_value_str, PIPELINE_YAML); + + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!( + output.rows[0].values[0].value_data, + Some(StringValue("1-true-a-1.1".to_string())) + ); +} + +#[test] +fn test_ignore_missing() { + let empty_string = r#"{}"#; + let pipeline_yaml = r#" +processors: + - join: + field: join_test + separator: "-" + ignore_missing: true + +transform: + - field: join_test + type: string +"#; + let output = common::parse_and_exec(empty_string, pipeline_yaml); + + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!(output.rows[0].values[0].value_data, None); +} diff --git a/src/pipeline/tests/letter.rs b/src/pipeline/tests/letter.rs new file mode 100644 index 0000000000..d6d9a2cccb --- /dev/null +++ b/src/pipeline/tests/letter.rs @@ -0,0 +1,188 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; + +use api::v1::ColumnSchema; +use greptime_proto::v1::value::ValueData; +use greptime_proto::v1::{ColumnDataType, SemanticType}; +use lazy_static::lazy_static; + +lazy_static! { + static ref EXPECTED_SCHEMA: Vec = vec![ + common::make_column_schema( + "input_str".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; +} + +#[test] +fn test_upper() { + let test_input = r#" +{ + "input_str": "aaa" +}"#; + + let pipeline_yaml = r#" +processors: + - letter: + fields: + - input_str + method: upper + +transform: + - fields: + - input_str + type: string +"#; + + let output = common::parse_and_exec(test_input, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!( + output.rows[0].values[0].value_data, + Some(ValueData::StringValue("AAA".to_string())) + ); +} + +#[test] +fn test_lower() { + let test_input = r#" +{ + "input_str": "AAA" +}"#; + + let pipeline_yaml = r#" +processors: + - letter: + fields: + - input_str + method: lower + +transform: + - fields: + - input_str + type: string +"#; + + let output = common::parse_and_exec(test_input, pipeline_yaml); + assert_eq!(output.schema, *EXPECTED_SCHEMA); + assert_eq!( + output.rows[0].values[0].value_data, + Some(ValueData::StringValue("aaa".to_string())) + ); +} + +#[test] +fn test_capital() { + let test_input = r#" +{ + "upper": "AAA", + "lower": "aaa" +}"#; + + let pipeline_yaml = r#" +processors: + - letter: + fields: + - upper + - lower + method: capital + +transform: + - fields: + - upper + - lower + type: string +"#; + + let expected_schema = vec![ + common::make_column_schema( + "upper".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "lower".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + let output = common::parse_and_exec(test_input, pipeline_yaml); + assert_eq!(output.schema, expected_schema); + assert_eq!( + output.rows[0].values[0].value_data, + Some(ValueData::StringValue("AAA".to_string())) + ); + assert_eq!( + output.rows[0].values[1].value_data, + Some(ValueData::StringValue("Aaa".to_string())) + ); +} + +#[test] +fn test_ignore_missing() { + let test_input = r#"{}"#; + + let pipeline_yaml = r#" +processors: + - letter: + fields: + - upper + - lower + method: capital + ignore_missing: true + +transform: + - fields: + - upper + - lower + type: string +"#; + + let expected_schema = vec![ + common::make_column_schema( + "upper".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "lower".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + let output = common::parse_and_exec(test_input, pipeline_yaml); + assert_eq!(output.schema, expected_schema); + assert_eq!(output.rows[0].values[0].value_data, None); + assert_eq!(output.rows[0].values[1].value_data, None); +} diff --git a/src/pipeline/tests/regex.rs b/src/pipeline/tests/regex.rs index 5519c61395..5be60c9875 100644 --- a/src/pipeline/tests/regex.rs +++ b/src/pipeline/tests/regex.rs @@ -14,8 +14,25 @@ mod common; +use api::v1::ColumnSchema; use greptime_proto::v1::value::ValueData::StringValue; use greptime_proto::v1::{ColumnDataType, SemanticType}; +use lazy_static::lazy_static; + +lazy_static! { + static ref EXPECTED_SCHEMA: Vec = vec![ + common::make_column_schema( + "str_id".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; +} #[test] fn test_regex_pattern() { @@ -41,20 +58,7 @@ transform: let output = common::parse_and_exec(input_value_str, pipeline_yaml); - let expected_schema = vec![ - common::make_column_schema( - "str_id".to_string(), - ColumnDataType::String, - SemanticType::Field, - ), - common::make_column_schema( - "greptime_timestamp".to_string(), - ColumnDataType::TimestampNanosecond, - SemanticType::Timestamp, - ), - ]; - - assert_eq!(output.schema, expected_schema); + assert_eq!(output.schema, *EXPECTED_SCHEMA); assert_eq!( output.rows[0].values[0].value_data, @@ -87,23 +91,34 @@ transform: let output = common::parse_and_exec(input_value_str, pipeline_yaml); - let expected_schema = vec![ - common::make_column_schema( - "str_id".to_string(), - ColumnDataType::String, - SemanticType::Field, - ), - common::make_column_schema( - "greptime_timestamp".to_string(), - ColumnDataType::TimestampNanosecond, - SemanticType::Timestamp, - ), - ]; - - assert_eq!(output.schema, expected_schema); + assert_eq!(output.schema, *EXPECTED_SCHEMA); assert_eq!( output.rows[0].values[0].value_data, Some(StringValue("123".to_string())) ); } + +#[test] +fn test_ignore_missing() { + let input_value_str = r#"{}"#; + + let pipeline_yaml = r#" +processors: + - regex: + fields: + - str + pattern: "(?\\d+)" + ignore_missing: true + +transform: + - field: str_id + type: string +"#; + + let output = common::parse_and_exec(input_value_str, pipeline_yaml); + + assert_eq!(output.schema, *EXPECTED_SCHEMA); + + assert_eq!(output.rows[0].values[0].value_data, None); +} diff --git a/src/pipeline/tests/urlencoding.rs b/src/pipeline/tests/urlencoding.rs new file mode 100644 index 0000000000..dd0c4ffe9f --- /dev/null +++ b/src/pipeline/tests/urlencoding.rs @@ -0,0 +1,112 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; + +use greptime_proto::v1::value::ValueData; +use greptime_proto::v1::{ColumnDataType, SemanticType}; + +#[test] +fn test() { + let test_input = r#" +{ + "encoding": "2024-06-27T06:13:36.991Z", + "decoding": "2024-06-27T06%3A13%3A36.991Z" +}"#; + + let pipeline_yaml = r#" +processors: + - urlencoding: + field: encoding + method: encode + + - urlencoding: + field: decoding + method: decode + +transform: + - fields: + - encoding + - decoding + type: string +"#; + + let expected_schema = vec![ + common::make_column_schema( + "encoding".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "decoding".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + let output = common::parse_and_exec(test_input, pipeline_yaml); + assert_eq!(output.schema, expected_schema); + assert_eq!( + output.rows[0].values[0].value_data, + Some(ValueData::StringValue( + "2024-06-27T06%3A13%3A36.991Z".to_string() + )) + ); + assert_eq!( + output.rows[0].values[1].value_data, + Some(ValueData::StringValue( + "2024-06-27T06:13:36.991Z".to_string() + )) + ); +} + +#[test] +fn test_ignore_missing() { + let test_input = r#"{}"#; + + let pipeline_yaml = r#" +processors: + - urlencoding: + field: encoding + method: encode + ignore_missing: true + +transform: + - fields: + - encoding + type: string +"#; + + let expected_schema = vec![ + common::make_column_schema( + "encoding".to_string(), + ColumnDataType::String, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + let output = common::parse_and_exec(test_input, pipeline_yaml); + assert_eq!(output.schema, expected_schema); + assert_eq!(output.rows[0].values[0].value_data, None); +}