chore: set now as timestamp field default value (#5502)

* chore: set now as timestamp field default value

* chore: import pipeline default value
This commit is contained in:
localhost
2025-02-12 01:41:44 +08:00
committed by GitHub
parent 5f6f5e980a
commit beb9c0a797
4 changed files with 56 additions and 20 deletions

View File

@@ -17,6 +17,8 @@ pub mod transformer;
use std::collections::BTreeMap;
use snafu::OptionExt;
use crate::etl::error::{Error, Result};
use crate::etl::transform::index::Index;
use crate::etl::value::Value;
@@ -28,7 +30,6 @@ const TRANSFORM_INDEX: &str = "index";
const TRANSFORM_DEFAULT: &str = "default";
const TRANSFORM_ON_FAILURE: &str = "on_failure";
use snafu::OptionExt;
pub use transformer::greptime::GreptimeTransformer;
use super::error::{
@@ -37,6 +38,7 @@ use super::error::{
};
use super::field::Fields;
use super::processor::{yaml_new_field, yaml_new_fields, yaml_string};
use super::value::Timestamp;
pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static {
type Output;
@@ -166,6 +168,14 @@ impl Transform {
pub(crate) fn get_type_matched_default_val(&self) -> &Value {
&self.type_
}
pub(crate) fn get_default_value_when_data_is_none(&self) -> Option<Value> {
if matches!(self.type_, Value::Timestamp(_)) && self.index.is_some_and(|i| i == Index::Time)
{
return Some(Value::Timestamp(Timestamp::default()));
}
None
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
@@ -228,6 +238,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
(_, _) => {
let target = type_.parse_str_value(default_value.to_str_value().as_str())?;
final_default = Some(target);
on_failure = Some(OnFailure::Default);
}
}
}

View File

@@ -84,9 +84,8 @@ impl GreptimePipelineParams {
impl GreptimeTransformer {
/// Add a default timestamp column to the transforms
fn add_greptime_timestamp_column(transforms: &mut Transforms) {
let ns = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let type_ = Value::Timestamp(Timestamp::Nanosecond(ns));
let default = Some(type_.clone());
let type_ = Value::Timestamp(Timestamp::Nanosecond(0));
let default = None;
let transform = Transform {
fields: Fields::one(Field::new(
@@ -192,9 +191,17 @@ impl Transformer for GreptimeTransformer {
values[output_index] = GreptimeValue { value_data };
}
None => {
let default = transform.get_default();
let value_data = match default {
Some(default) => coerce_value(default, transform)?,
let value_data = match transform.on_failure {
Some(crate::etl::transform::OnFailure::Default) => {
match transform.get_default() {
Some(default) => coerce_value(default, transform)?,
None => match transform.get_default_value_when_data_is_none() {
Some(default) => coerce_value(&default, transform)?,
None => None,
},
}
}
Some(crate::etl::transform::OnFailure::Ignore) => None,
None => None,
};
values[output_index] = GreptimeValue { value_data };

View File

@@ -159,19 +159,7 @@ fn coerce_type(transform: &Transform) -> Result<(ColumnDataType, Option<ColumnDa
pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result<Option<ValueData>> {
match val {
Value::Null => match &transform.default {
Some(default) => coerce_value(default, transform),
None => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => transform
.get_default()
.map(|default| coerce_value(default, transform))
.unwrap_or_else(|| {
coerce_value(transform.get_type_matched_default_val(), transform)
}),
None => Ok(None),
},
},
Value::Null => Ok(None),
Value::Int8(n) => coerce_i64_value(*n as i64, transform),
Value::Int16(n) => coerce_i64_value(*n as i64, transform),

View File

@@ -785,6 +785,36 @@ transform:
assert_eq!(expected, r);
}
#[test]
fn test_timestamp_default_now() {
let input_value = serde_json::json!({"abc": "hello world"});
let pipeline_yaml = r#"
processors:
transform:
- field: abc
type: string
on_failure: default
"#;
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();
let mut status = json_to_intermediate_state(input_value).unwrap();
let row = pipeline
.exec_mut(&mut status)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
row.values.into_iter().for_each(|v| {
if let ValueData::TimestampNanosecondValue(v) = v.value_data.unwrap() {
let now = chrono::Utc::now().timestamp_nanos_opt().unwrap();
assert!(now - v < 1_000_000);
}
});
}
#[test]
fn test_dispatch() {
let input_value_str1 = r#"