mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 23:10:37 +00:00
fix: ts conversion during transform phase (#4790)
* fix: allow ts conversion during transform phase * chore: replace `unimplemented` with snafu
This commit is contained in:
@@ -438,6 +438,17 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("failed to coerce complex value, not supported"))]
|
||||
CoerceComplexType {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("failed to coerce value: {msg}"))]
|
||||
CoerceIncompatibleTypes {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}"
|
||||
|
||||
@@ -20,7 +20,8 @@ use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::etl::error::{
|
||||
CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
|
||||
CoerceComplexTypeSnafu, CoerceIncompatibleTypesSnafu, CoerceStringToTypeSnafu,
|
||||
CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
|
||||
CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,
|
||||
};
|
||||
use crate::etl::transform::index::Index;
|
||||
@@ -61,8 +62,7 @@ impl TryFrom<Value> for ValueData {
|
||||
}
|
||||
Value::Timestamp(Timestamp::Second(s)) => Ok(ValueData::TimestampSecondValue(s)),
|
||||
|
||||
Value::Array(_) => unimplemented!("Array type not supported"),
|
||||
Value::Map(_) => unimplemented!("Object type not supported"),
|
||||
Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -134,8 +134,7 @@ fn coerce_type(transform: &Transform) -> Result<ColumnDataType> {
|
||||
Value::Timestamp(Timestamp::Millisecond(_)) => Ok(ColumnDataType::TimestampMillisecond),
|
||||
Value::Timestamp(Timestamp::Second(_)) => Ok(ColumnDataType::TimestampSecond),
|
||||
|
||||
Value::Array(_) => unimplemented!("Array"),
|
||||
Value::Map(_) => unimplemented!("Object"),
|
||||
Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(),
|
||||
|
||||
Value::Null => CoerceUnsupportedNullTypeToSnafu {
|
||||
ty: transform.type_.to_str_type(),
|
||||
@@ -176,19 +175,28 @@ pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result<Option<
|
||||
Value::Boolean(b) => coerce_bool_value(*b, transform),
|
||||
Value::String(s) => coerce_string_value(s, transform),
|
||||
|
||||
Value::Timestamp(Timestamp::Nanosecond(ns)) => {
|
||||
Ok(Some(ValueData::TimestampNanosecondValue(*ns)))
|
||||
}
|
||||
Value::Timestamp(Timestamp::Microsecond(us)) => {
|
||||
Ok(Some(ValueData::TimestampMicrosecondValue(*us)))
|
||||
}
|
||||
Value::Timestamp(Timestamp::Millisecond(ms)) => {
|
||||
Ok(Some(ValueData::TimestampMillisecondValue(*ms)))
|
||||
}
|
||||
Value::Timestamp(Timestamp::Second(s)) => Ok(Some(ValueData::TimestampSecondValue(*s))),
|
||||
Value::Timestamp(input_timestamp) => match &transform.type_ {
|
||||
Value::Timestamp(target_timestamp) => match target_timestamp {
|
||||
Timestamp::Nanosecond(_) => Ok(Some(ValueData::TimestampNanosecondValue(
|
||||
input_timestamp.timestamp_nanos(),
|
||||
))),
|
||||
Timestamp::Microsecond(_) => Ok(Some(ValueData::TimestampMicrosecondValue(
|
||||
input_timestamp.timestamp_micros(),
|
||||
))),
|
||||
Timestamp::Millisecond(_) => Ok(Some(ValueData::TimestampMillisecondValue(
|
||||
input_timestamp.timestamp_millis(),
|
||||
))),
|
||||
Timestamp::Second(_) => Ok(Some(ValueData::TimestampSecondValue(
|
||||
input_timestamp.timestamp(),
|
||||
))),
|
||||
},
|
||||
_ => CoerceIncompatibleTypesSnafu {
|
||||
msg: "Timestamp can only be coerced to another timestamp",
|
||||
}
|
||||
.fail(),
|
||||
},
|
||||
|
||||
Value::Array(_) => unimplemented!("Array type not supported"),
|
||||
Value::Map(_) => unimplemented!("Object type not supported"),
|
||||
Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,8 +228,7 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>
|
||||
}
|
||||
},
|
||||
|
||||
Value::Array(_) => unimplemented!("Array type not supported"),
|
||||
Value::Map(_) => unimplemented!("Object type not supported"),
|
||||
Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(),
|
||||
|
||||
Value::Null => return Ok(None),
|
||||
};
|
||||
@@ -257,8 +264,7 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>>
|
||||
}
|
||||
},
|
||||
|
||||
Value::Array(_) => unimplemented!("Array type not supported"),
|
||||
Value::Map(_) => unimplemented!("Object type not supported"),
|
||||
Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(),
|
||||
|
||||
Value::Null => return Ok(None),
|
||||
};
|
||||
@@ -294,8 +300,7 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>>
|
||||
}
|
||||
},
|
||||
|
||||
Value::Array(_) => unimplemented!("Array type not supported"),
|
||||
Value::Map(_) => unimplemented!("Object type not supported"),
|
||||
Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(),
|
||||
|
||||
Value::Null => return Ok(None),
|
||||
};
|
||||
@@ -331,8 +336,7 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>>
|
||||
}
|
||||
},
|
||||
|
||||
Value::Array(_) => unimplemented!("Array type not supported"),
|
||||
Value::Map(_) => unimplemented!("Object type not supported"),
|
||||
Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(),
|
||||
|
||||
Value::Null => return Ok(None),
|
||||
};
|
||||
@@ -407,8 +411,7 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<Value
|
||||
None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(),
|
||||
},
|
||||
|
||||
Value::Array(_) => unimplemented!("Array type not supported"),
|
||||
Value::Map(_) => unimplemented!("Object type not supported"),
|
||||
Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(),
|
||||
|
||||
Value::Null => Ok(None),
|
||||
}
|
||||
|
||||
@@ -200,6 +200,8 @@ transform:
|
||||
|
||||
#[test]
|
||||
fn test_default_wrong_resolution() {
|
||||
// given a number, we have no ways to guess its resolution
|
||||
// but we can convert resolution during transform phase
|
||||
let test_input = r#"
|
||||
{
|
||||
"input_s": "1722580862",
|
||||
@@ -209,28 +211,30 @@ fn test_default_wrong_resolution() {
|
||||
let pipeline_yaml = r#"
|
||||
processors:
|
||||
- epoch:
|
||||
fields:
|
||||
- input_s
|
||||
- input_nano
|
||||
field: input_s
|
||||
resolution: s
|
||||
- epoch:
|
||||
field: input_nano
|
||||
resolution: ns
|
||||
|
||||
transform:
|
||||
- fields:
|
||||
- input_s
|
||||
type: epoch, s
|
||||
type: epoch, ms
|
||||
- fields:
|
||||
- input_nano
|
||||
type: epoch, nano
|
||||
type: epoch, ms
|
||||
"#;
|
||||
|
||||
let expected_schema = vec![
|
||||
common::make_column_schema(
|
||||
"input_s".to_string(),
|
||||
ColumnDataType::TimestampSecond,
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"input_nano".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
@@ -242,14 +246,12 @@ transform:
|
||||
|
||||
let output = common::parse_and_exec(test_input, pipeline_yaml);
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
// this is actually wrong
|
||||
// TODO(shuiyisong): add check for type when converting epoch
|
||||
assert_eq!(
|
||||
output.rows[0].values[0].value_data,
|
||||
Some(ValueData::TimestampMillisecondValue(1722580862))
|
||||
Some(ValueData::TimestampMillisecondValue(1722580862000))
|
||||
);
|
||||
assert_eq!(
|
||||
output.rows[0].values[1].value_data,
|
||||
Some(ValueData::TimestampMillisecondValue(1722583122284583936))
|
||||
Some(ValueData::TimestampMillisecondValue(1722583122284))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -318,6 +318,7 @@ transform:
|
||||
|
||||
#[test]
|
||||
fn test_timestamp_default_wrong_resolution() {
|
||||
// same as test_default_wrong_resolution from epoch tests
|
||||
let test_input = r#"
|
||||
{
|
||||
"input_s": "1722580862",
|
||||
@@ -327,28 +328,30 @@ fn test_timestamp_default_wrong_resolution() {
|
||||
let pipeline_yaml = r#"
|
||||
processors:
|
||||
- timestamp:
|
||||
fields:
|
||||
- input_s
|
||||
- input_nano
|
||||
field: input_s
|
||||
resolution: s
|
||||
- timestamp:
|
||||
field: input_nano
|
||||
resolution: ns
|
||||
|
||||
transform:
|
||||
- fields:
|
||||
- input_s
|
||||
type: timestamp, s
|
||||
type: timestamp, ms
|
||||
- fields:
|
||||
- input_nano
|
||||
type: timestamp, nano
|
||||
type: timestamp, ms
|
||||
"#;
|
||||
|
||||
let expected_schema = vec![
|
||||
common::make_column_schema(
|
||||
"input_s".to_string(),
|
||||
ColumnDataType::TimestampSecond,
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"input_nano".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
@@ -360,14 +363,12 @@ transform:
|
||||
|
||||
let output = common::parse_and_exec(test_input, pipeline_yaml);
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
// this is actually wrong
|
||||
// TODO(shuiyisong): add check for type when converting epoch
|
||||
assert_eq!(
|
||||
output.rows[0].values[0].value_data,
|
||||
Some(ValueData::TimestampMillisecondValue(1722580862))
|
||||
Some(ValueData::TimestampMillisecondValue(1722580862000))
|
||||
);
|
||||
assert_eq!(
|
||||
output.rows[0].values[1].value_data,
|
||||
Some(ValueData::TimestampMillisecondValue(1722583122284583936))
|
||||
Some(ValueData::TimestampMillisecondValue(1722583122284))
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user