diff --git a/src/pipeline/src/etl/transform.rs b/src/pipeline/src/etl/transform.rs index e3039d6c7a..a61444d945 100644 --- a/src/pipeline/src/etl/transform.rs +++ b/src/pipeline/src/etl/transform.rs @@ -17,6 +17,8 @@ pub mod transformer; use std::collections::BTreeMap; +use snafu::OptionExt; + use crate::etl::error::{Error, Result}; use crate::etl::transform::index::Index; use crate::etl::value::Value; @@ -28,7 +30,6 @@ const TRANSFORM_INDEX: &str = "index"; const TRANSFORM_DEFAULT: &str = "default"; const TRANSFORM_ON_FAILURE: &str = "on_failure"; -use snafu::OptionExt; pub use transformer::greptime::GreptimeTransformer; use super::error::{ @@ -37,6 +38,7 @@ use super::error::{ }; use super::field::Fields; use super::processor::{yaml_new_field, yaml_new_fields, yaml_string}; +use super::value::Timestamp; pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static { type Output; @@ -166,6 +168,14 @@ impl Transform { pub(crate) fn get_type_matched_default_val(&self) -> &Value { &self.type_ } + + pub(crate) fn get_default_value_when_data_is_none(&self) -> Option { + if matches!(self.type_, Value::Timestamp(_)) && self.index.is_some_and(|i| i == Index::Time) + { + return Some(Value::Timestamp(Timestamp::default())); + } + None + } } impl TryFrom<&yaml_rust::yaml::Hash> for Transform { @@ -228,6 +238,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform { (_, _) => { let target = type_.parse_str_value(default_value.to_str_value().as_str())?; final_default = Some(target); + on_failure = Some(OnFailure::Default); } } } diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index eb8d0f8827..621acc7581 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -84,9 +84,8 @@ impl GreptimePipelineParams { impl GreptimeTransformer { /// Add a default timestamp column to the transforms fn add_greptime_timestamp_column(transforms: &mut Transforms) { - let ns = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0); - let type_ = Value::Timestamp(Timestamp::Nanosecond(ns)); - let default = Some(type_.clone()); + let type_ = Value::Timestamp(Timestamp::Nanosecond(0)); + let default = None; let transform = Transform { fields: Fields::one(Field::new( @@ -192,9 +191,17 @@ impl Transformer for GreptimeTransformer { values[output_index] = GreptimeValue { value_data }; } None => { - let default = transform.get_default(); - let value_data = match default { - Some(default) => coerce_value(default, transform)?, + let value_data = match transform.on_failure { + Some(crate::etl::transform::OnFailure::Default) => { + match transform.get_default() { + Some(default) => coerce_value(default, transform)?, + None => match transform.get_default_value_when_data_is_none() { + Some(default) => coerce_value(&default, transform)?, + None => None, + }, + } + } + Some(crate::etl::transform::OnFailure::Ignore) => None, None => None, }; values[output_index] = GreptimeValue { value_data }; diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index da345b3bde..a796a816ec 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -159,19 +159,7 @@ fn coerce_type(transform: &Transform) -> Result<(ColumnDataType, Option Result> { match val { - Value::Null => match &transform.default { - Some(default) => coerce_value(default, transform), - None => match transform.on_failure { - Some(OnFailure::Ignore) => Ok(None), - Some(OnFailure::Default) => transform - .get_default() - .map(|default| coerce_value(default, transform)) - .unwrap_or_else(|| { - coerce_value(transform.get_type_matched_default_val(), transform) - }), - None => Ok(None), - }, - }, + Value::Null => Ok(None), Value::Int8(n) => coerce_i64_value(*n as i64, transform), Value::Int16(n) => coerce_i64_value(*n as i64, transform), diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index 7a170660a9..3f3a90c55f 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -785,6 +785,36 @@ transform: assert_eq!(expected, r); } +#[test] +fn test_timestamp_default_now() { + let input_value = serde_json::json!({"abc": "hello world"}); + + let pipeline_yaml = r#" +processors: +transform: + - field: abc + type: string + on_failure: default +"#; + + let yaml_content = Content::Yaml(pipeline_yaml); + let pipeline: Pipeline = parse(&yaml_content).unwrap(); + + let mut status = json_to_intermediate_state(input_value).unwrap(); + let row = pipeline + .exec_mut(&mut status) + .unwrap() + .into_transformed() + .expect("expect transformed result "); + + row.values.into_iter().for_each(|v| { + if let ValueData::TimestampNanosecondValue(v) = v.value_data.unwrap() { + let now = chrono::Utc::now().timestamp_nanos_opt().unwrap(); + assert!(now - v < 1_000_000); + } + }); +} + #[test] fn test_dispatch() { let input_value_str1 = r#"