test: more on processors (#4493)

* test: add date test

* test: add epoch test

* test: add letter test and complete some others

* test: add urlencoding test

* chore: typo
This commit is contained in:
shuiyisong
2024-08-04 16:29:31 +08:00
committed by GitHub
parent cb4cffe636
commit 3b701d8f5e
8 changed files with 954 additions and 32 deletions

138
src/pipeline/tests/date.rs Normal file
View File

@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
const TEST_INPUT: &str = r#"
{
"input_str": "2024-06-27T06:13:36.991Z"
}"#;
const TEST_VALUE: Option<ValueData> =
Some(ValueData::TimestampNanosecondValue(1719468816991000000));
lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"ts".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}
#[test]
fn test_parse_date() {
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}
#[test]
fn test_multi_formats() {
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}
#[test]
fn test_ignore_missing() {
let empty_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
ignore_missing: true
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(empty_input, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, None);
}
#[test]
fn test_timezone() {
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
ignore_missing: true
timezone: 'Asia/Shanghai'
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampNanosecondValue(1719440016991000000))
);
}

View File

@@ -17,6 +17,10 @@ mod common;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};
fn make_string_column_schema(name: String) -> greptime_proto::v1::ColumnSchema {
common::make_column_schema(name, ColumnDataType::String, SemanticType::Field)
}
#[test]
fn test_dissect_pattern() {
let input_value_str = r#"
@@ -43,8 +47,8 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
@@ -91,8 +95,8 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
@@ -111,3 +115,141 @@ transform:
Some(StringValue("456".to_string()))
);
}
#[test]
fn test_ignore_missing() {
let empty_str = r#"{}"#;
let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{a} %{b}"
ignore_missing: true
transform:
- fields:
- a
- b
type: string
"#;
let output = common::parse_and_exec(empty_str, pipeline_yaml);
let expected_schema = vec![
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
assert_eq!(output.rows[0].values[1].value_data, None);
}
#[test]
fn test_modifier() {
let empty_str = r#"
{
"str": "key1 key2 key3 key4 key5 key6 key7 key8"
}"#;
let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6} %{*key_7} %{&key_7}"
transform:
- fields:
- key1
- key2
- key3
- key5
- key7
type: string
"#;
let output = common::parse_and_exec(empty_str, pipeline_yaml);
let expected_schema = vec![
make_string_column_schema("key1".to_string()),
make_string_column_schema("key2".to_string()),
make_string_column_schema("key3".to_string()),
make_string_column_schema("key5".to_string()),
make_string_column_schema("key7".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("key1".to_string()))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(StringValue("key2".to_string()))
);
assert_eq!(
output.rows[0].values[2].value_data,
Some(StringValue("key3 key4".to_string()))
);
assert_eq!(
output.rows[0].values[3].value_data,
Some(StringValue("key5".to_string()))
);
assert_eq!(
output.rows[0].values[4].value_data,
Some(StringValue("key8".to_string()))
);
}
#[test]
fn test_append_separator() {
let empty_str = r#"
{
"str": "key1 key2"
}"#;
let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{+key1} %{+key1}"
append_separator: "_"
transform:
- fields:
- key1
type: string
"#;
let output = common::parse_and_exec(empty_str, pipeline_yaml);
let expected_schema = vec![
make_string_column_schema("key1".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("key1_key2".to_string()))
);
}

255
src/pipeline/tests/epoch.rs Normal file
View File

@@ -0,0 +1,255 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
#[test]
fn test_parse_epoch() {
let test_input = r#"
{
"input_s": "1722580862",
"input_sec": "1722580862",
"input_second": "1722580862",
"input_ms": "1722580887794",
"input_millisecond": "1722580887794",
"input_milli": "1722580887794",
"input_default": "1722580887794",
"input_us": "1722580905423969",
"input_microsecond": "1722580905423969",
"input_micro": "1722580905423969",
"input_ns": "1722580929863842048",
"input_nanosecond": "1722580929863842048",
"input_nano": "1722580929863842048"
}"#;
let pipeline_yaml = r#"
processors:
- epoch:
field: input_s
resolution: s
- epoch:
field: input_sec
resolution: sec
- epoch:
field: input_second
resolution: second
- epoch:
field: input_ms
resolution: ms
- epoch:
field: input_millisecond
resolution: millisecond
- epoch:
field: input_milli
resolution: milli
- epoch:
field: input_default
- epoch:
field: input_us
resolution: us
- epoch:
field: input_microsecond
resolution: microsecond
- epoch:
field: input_micro
resolution: micro
- epoch:
field: input_ns
resolution: ns
- epoch:
field: input_nanosecond
resolution: nanosecond
- epoch:
field: input_nano
resolution: nano
transform:
- field: input_s
type: epoch, s
- field: input_sec
type: epoch, sec
- field: input_second
type: epoch, second
- field: input_ms
type: epoch, ms
- field: input_millisecond
type: epoch, millisecond
- field: input_milli
type: epoch, milli
- field: input_default
type: epoch, milli
- field: input_us
type: epoch, us
- field: input_microsecond
type: epoch, microsecond
- field: input_micro
type: epoch, micro
- field: input_ns
type: epoch, ns
- field: input_nanosecond
type: epoch, nanosecond
- field: input_nano
type: epoch, nano
"#;
fn make_time_field(name: &str, datatype: ColumnDataType) -> ColumnSchema {
common::make_column_schema(name.to_string(), datatype, SemanticType::Field)
}
let expected_schema = vec![
make_time_field("input_s", ColumnDataType::TimestampSecond),
make_time_field("input_sec", ColumnDataType::TimestampSecond),
make_time_field("input_second", ColumnDataType::TimestampSecond),
make_time_field("input_ms", ColumnDataType::TimestampMillisecond),
make_time_field("input_millisecond", ColumnDataType::TimestampMillisecond),
make_time_field("input_milli", ColumnDataType::TimestampMillisecond),
make_time_field("input_default", ColumnDataType::TimestampMillisecond),
make_time_field("input_us", ColumnDataType::TimestampMicrosecond),
make_time_field("input_microsecond", ColumnDataType::TimestampMicrosecond),
make_time_field("input_micro", ColumnDataType::TimestampMicrosecond),
make_time_field("input_ns", ColumnDataType::TimestampNanosecond),
make_time_field("input_nanosecond", ColumnDataType::TimestampNanosecond),
make_time_field("input_nano", ColumnDataType::TimestampNanosecond),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
for i in 0..2 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampSecondValue(1722580862))
);
}
for i in 3..6 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampMillisecondValue(1722580887794))
);
}
for i in 7..9 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampMicrosecondValue(1722580905423969))
);
}
for i in 10..12 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampNanosecondValue(1722580929863842048))
);
}
}
#[test]
fn test_ignore_missing() {
let empty_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- epoch:
field: input_s
resolution: s
ignore_missing: true
transform:
- fields:
- input_s, ts
type: epoch, s
"#;
let expected_schema = vec![
common::make_column_schema(
"ts".to_string(),
ColumnDataType::TimestampSecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(empty_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
}
#[test]
fn test_default_wrong_resolution() {
let test_input = r#"
{
"input_s": "1722580862",
"input_nano": "1722583122284583936"
}"#;
let pipeline_yaml = r#"
processors:
- epoch:
fields:
- input_s
- input_nano
transform:
- fields:
- input_s
type: epoch, s
- fields:
- input_nano
type: epoch, nano
"#;
let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampSecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
// this is actually wrong
// TODO(shuiyisong): add check for type when converting epoch
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1722580862))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampMillisecondValue(1722583122284583936))
);
}

View File

@@ -61,3 +61,37 @@ transform:
Some(TimestampMillisecondValue(1573840000000))
);
}
#[test]
fn test_ignore_missing() {
let empty_string = r#"{}"#;
let pipeline_yaml = r#"
processors:
- gsub:
field: reqTimeSec
pattern: "\\."
replacement: ""
ignore_missing: true
- epoch:
field: reqTimeSec
resolution: millisecond
ignore_missing: true
transform:
- field: reqTimeSec
type: epoch, millisecond
index: timestamp
"#;
let output = common::parse_and_exec(empty_string, pipeline_yaml);
let expected_schema = vec![common::make_column_schema(
"reqTimeSec".to_string(),
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
)];
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
}

View File

@@ -117,3 +117,41 @@ fn test_float() {
Some(StringValue("1.1-1.2-1.3".to_string()))
);
}
#[test]
fn test_mix_type() {
let input_value_str = r#"
[
{
"join_test": [1, true, "a", 1.1]
}
]
"#;
let output = common::parse_and_exec(input_value_str, PIPELINE_YAML);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("1-true-a-1.1".to_string()))
);
}
#[test]
fn test_ignore_missing() {
let empty_string = r#"{}"#;
let pipeline_yaml = r#"
processors:
- join:
field: join_test
separator: "-"
ignore_missing: true
transform:
- field: join_test
type: string
"#;
let output = common::parse_and_exec(empty_string, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, None);
}

View File

@@ -0,0 +1,188 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"input_str".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}
#[test]
fn test_upper() {
let test_input = r#"
{
"input_str": "aaa"
}"#;
let pipeline_yaml = r#"
processors:
- letter:
fields:
- input_str
method: upper
transform:
- fields:
- input_str
type: string
"#;
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::StringValue("AAA".to_string()))
);
}
#[test]
fn test_lower() {
let test_input = r#"
{
"input_str": "AAA"
}"#;
let pipeline_yaml = r#"
processors:
- letter:
fields:
- input_str
method: lower
transform:
- fields:
- input_str
type: string
"#;
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::StringValue("aaa".to_string()))
);
}
#[test]
fn test_capital() {
let test_input = r#"
{
"upper": "AAA",
"lower": "aaa"
}"#;
let pipeline_yaml = r#"
processors:
- letter:
fields:
- upper
- lower
method: capital
transform:
- fields:
- upper
- lower
type: string
"#;
let expected_schema = vec![
common::make_column_schema(
"upper".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"lower".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::StringValue("AAA".to_string()))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::StringValue("Aaa".to_string()))
);
}
#[test]
fn test_ignore_missing() {
let test_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- letter:
fields:
- upper
- lower
method: capital
ignore_missing: true
transform:
- fields:
- upper
- lower
type: string
"#;
let expected_schema = vec![
common::make_column_schema(
"upper".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"lower".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
assert_eq!(output.rows[0].values[1].value_data, None);
}

View File

@@ -14,8 +14,25 @@
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"str_id".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}
#[test]
fn test_regex_pattern() {
@@ -41,20 +58,7 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema(
"str_id".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
@@ -87,23 +91,34 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema(
"str_id".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
}
#[test]
fn test_ignore_missing() {
let input_value_str = r#"{}"#;
let pipeline_yaml = r#"
processors:
- regex:
fields:
- str
pattern: "(?<id>\\d+)"
ignore_missing: true
transform:
- field: str_id
type: string
"#;
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, None);
}

View File

@@ -0,0 +1,112 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
#[test]
fn test() {
let test_input = r#"
{
"encoding": "2024-06-27T06:13:36.991Z",
"decoding": "2024-06-27T06%3A13%3A36.991Z"
}"#;
let pipeline_yaml = r#"
processors:
- urlencoding:
field: encoding
method: encode
- urlencoding:
field: decoding
method: decode
transform:
- fields:
- encoding
- decoding
type: string
"#;
let expected_schema = vec![
common::make_column_schema(
"encoding".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"decoding".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::StringValue(
"2024-06-27T06%3A13%3A36.991Z".to_string()
))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::StringValue(
"2024-06-27T06:13:36.991Z".to_string()
))
);
}
#[test]
fn test_ignore_missing() {
let test_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- urlencoding:
field: encoding
method: encode
ignore_missing: true
transform:
- fields:
- encoding
type: string
"#;
let expected_schema = vec![
common::make_column_schema(
"encoding".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
}