feat(pipeline): allow coerce timestamp from integer values (#5270)

* feat: allow coerce timestamp from integer values

* test: add tests for parsing integer values
This commit is contained in:
Ning Sun
2025-01-06 16:53:09 +08:00
committed by GitHub
parent ddf36c8324
commit 04708f10aa
2 changed files with 61 additions and 19 deletions

View File

@@ -257,7 +257,7 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>
}
fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>> {
let val = match transform.type_ {
let val = match &transform.type_ {
Value::Int8(_) => ValueData::I8Value(n as i32),
Value::Int16(_) => ValueData::I16Value(n as i32),
Value::Int32(_) => ValueData::I32Value(n as i32),
@@ -274,14 +274,11 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>>
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
}
None => {
return CoerceUnsupportedEpochTypeSnafu { ty: "Integer" }.fail();
}
Value::Timestamp(unit) => match unit {
Timestamp::Nanosecond(_) => ValueData::TimestampNanosecondValue(n),
Timestamp::Microsecond(_) => ValueData::TimestampMicrosecondValue(n),
Timestamp::Millisecond(_) => ValueData::TimestampMillisecondValue(n),
Timestamp::Second(_) => ValueData::TimestampSecondValue(n),
},
Value::Array(_) | Value::Map(_) => {
@@ -298,7 +295,7 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>>
}
fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>> {
let val = match transform.type_ {
let val = match &transform.type_ {
Value::Int8(_) => ValueData::I8Value(n as i32),
Value::Int16(_) => ValueData::I16Value(n as i32),
Value::Int32(_) => ValueData::I32Value(n as i32),
@@ -315,14 +312,11 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>>
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
}
None => {
return CoerceUnsupportedEpochTypeSnafu { ty: "Integer" }.fail();
}
Value::Timestamp(unit) => match unit {
Timestamp::Nanosecond(_) => ValueData::TimestampNanosecondValue(n as i64),
Timestamp::Microsecond(_) => ValueData::TimestampMicrosecondValue(n as i64),
Timestamp::Millisecond(_) => ValueData::TimestampMillisecondValue(n as i64),
Timestamp::Second(_) => ValueData::TimestampSecondValue(n as i64),
},
Value::Array(_) | Value::Map(_) => {

View File

@@ -119,7 +119,7 @@ transform:
field: input_s
resolution: s
ignore_missing: true
transform:
- fields:
- input_s, ts
@@ -372,3 +372,51 @@ transform:
Some(ValueData::TimestampMillisecondValue(1722583122284))
);
}
#[test]
fn test_timestamp_without_processor() {
let test_input = r#"
{
"input_s": 1722580862,
"input_nano": 1722583122284583936
}"#;
let pipeline_yaml = r#"
transform:
- fields:
- input_s
type: timestamp, s
- fields:
- input_nano
type: timestamp, ns
"#;
let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampSecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampSecondValue(1722580862))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampNanosecondValue(1722583122284583936))
);
}