diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 00b5cc6624..5f448b386c 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -257,7 +257,7 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result } fn coerce_i64_value(n: i64, transform: &Transform) -> Result> { - let val = match transform.type_ { + let val = match &transform.type_ { Value::Int8(_) => ValueData::I8Value(n as i32), Value::Int16(_) => ValueData::I16Value(n as i32), Value::Int32(_) => ValueData::I32Value(n as i32), @@ -274,14 +274,11 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result> Value::Boolean(_) => ValueData::BoolValue(n != 0), Value::String(_) => ValueData::StringValue(n.to_string()), - Value::Timestamp(_) => match transform.on_failure { - Some(OnFailure::Ignore) => return Ok(None), - Some(OnFailure::Default) => { - return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(); - } - None => { - return CoerceUnsupportedEpochTypeSnafu { ty: "Integer" }.fail(); - } + Value::Timestamp(unit) => match unit { + Timestamp::Nanosecond(_) => ValueData::TimestampNanosecondValue(n), + Timestamp::Microsecond(_) => ValueData::TimestampMicrosecondValue(n), + Timestamp::Millisecond(_) => ValueData::TimestampMillisecondValue(n), + Timestamp::Second(_) => ValueData::TimestampSecondValue(n), }, Value::Array(_) | Value::Map(_) => { @@ -298,7 +295,7 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result> } fn coerce_u64_value(n: u64, transform: &Transform) -> Result> { - let val = match transform.type_ { + let val = match &transform.type_ { Value::Int8(_) => ValueData::I8Value(n as i32), Value::Int16(_) => ValueData::I16Value(n as i32), Value::Int32(_) => ValueData::I32Value(n as i32), @@ -315,14 +312,11 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result> Value::Boolean(_) => ValueData::BoolValue(n != 0), Value::String(_) => ValueData::StringValue(n.to_string()), - Value::Timestamp(_) => match transform.on_failure { - Some(OnFailure::Ignore) => return Ok(None), - Some(OnFailure::Default) => { - return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(); - } - None => { - return CoerceUnsupportedEpochTypeSnafu { ty: "Integer" }.fail(); - } + Value::Timestamp(unit) => match unit { + Timestamp::Nanosecond(_) => ValueData::TimestampNanosecondValue(n as i64), + Timestamp::Microsecond(_) => ValueData::TimestampMicrosecondValue(n as i64), + Timestamp::Millisecond(_) => ValueData::TimestampMillisecondValue(n as i64), + Timestamp::Second(_) => ValueData::TimestampSecondValue(n as i64), }, Value::Array(_) | Value::Map(_) => { diff --git a/src/pipeline/tests/timestamp.rs b/src/pipeline/tests/timestamp.rs index 85cf0643fc..8f7d3d0bc3 100644 --- a/src/pipeline/tests/timestamp.rs +++ b/src/pipeline/tests/timestamp.rs @@ -119,7 +119,7 @@ transform: field: input_s resolution: s ignore_missing: true - + transform: - fields: - input_s, ts @@ -372,3 +372,51 @@ transform: Some(ValueData::TimestampMillisecondValue(1722583122284)) ); } + +#[test] +fn test_timestamp_without_processor() { + let test_input = r#" + { + "input_s": 1722580862, + "input_nano": 1722583122284583936 + }"#; + + let pipeline_yaml = r#" +transform: + - fields: + - input_s + type: timestamp, s + - fields: + - input_nano + type: timestamp, ns +"#; + + let expected_schema = vec![ + common::make_column_schema( + "input_s".to_string(), + ColumnDataType::TimestampSecond, + SemanticType::Field, + ), + common::make_column_schema( + "input_nano".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Field, + ), + common::make_column_schema( + "greptime_timestamp".to_string(), + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + ), + ]; + + let output = common::parse_and_exec(test_input, pipeline_yaml); + assert_eq!(output.schema, expected_schema); + assert_eq!( + output.rows[0].values[0].value_data, + Some(ValueData::TimestampSecondValue(1722580862)) + ); + assert_eq!( + output.rows[0].values[1].value_data, + Some(ValueData::TimestampNanosecondValue(1722583122284583936)) + ); +}