diff --git a/src/pipeline/src/etl/transform/mod.rs b/src/pipeline/src/etl/transform/mod.rs index 991aa05df6..7b09bca965 100644 --- a/src/pipeline/src/etl/transform/mod.rs +++ b/src/pipeline/src/etl/transform/mod.rs @@ -27,6 +27,7 @@ const TRANSFORM_FIELDS: &str = "fields"; const TRANSFORM_TYPE: &str = "type"; const TRANSFORM_INDEX: &str = "index"; const TRANSFORM_DEFAULT: &str = "default"; +const TRANSFORM_ON_FAILURE: &str = "on_failure"; pub use transformer::greptime::GreptimeTransformer; // pub use transformer::noop::NoopTransformer; @@ -38,6 +39,38 @@ pub trait Transformer: std::fmt::Display + Sized + Send + Sync + 'static { fn transform(&self, val: crate::etl::value::Value) -> Result; } +/// On Failure behavior when transform fails +#[derive(Debug, Clone, Default)] +pub enum OnFailure { + // Return None if transform fails + #[default] + Ignore, + // Return default value of the field if transform fails + // Default value depends on the type of the field, or explicitly set by user + Default, +} + +impl std::str::FromStr for OnFailure { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "ignore" => Ok(OnFailure::Ignore), + "default" => Ok(OnFailure::Default), + _ => Err(format!("invalid transform on_failure value: {}", s)), + } + } +} + +impl std::fmt::Display for OnFailure { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + OnFailure::Ignore => write!(f, "ignore"), + OnFailure::Default => write!(f, "default"), + } + } +} + #[derive(Debug, Default, Clone)] pub struct Transforms { transforms: Vec, @@ -97,6 +130,8 @@ pub struct Transform { pub default: Option, pub index: Option, + + pub on_failure: Option, } impl std::fmt::Display for Transform { @@ -107,10 +142,21 @@ impl std::fmt::Display for Transform { "".to_string() }; - let fields = format!("field(s): {}", self.fields); let type_ = format!("type: {}", self.type_); + let fields = format!("field(s): {}", self.fields); + let default = if let Some(default) = &self.default { + format!(", default: {}", default) + } else { + "".to_string() + }; - write!(f, "{type_}{index}, {fields}") + let on_failure = if let Some(on_failure) = &self.on_failure { + format!(", on_failure: {}", on_failure) + } else { + "".to_string() + }; + + write!(f, "{type_}{index}, {fields}{default}{on_failure}",) } } @@ -121,6 +167,7 @@ impl Default for Transform { type_: Value::Null, default: None, index: None, + on_failure: None, } } } @@ -155,9 +202,17 @@ impl Transform { self.index = Some(index); } + fn with_on_failure(&mut self, on_failure: OnFailure) { + self.on_failure = Some(on_failure); + } + pub(crate) fn get_default(&self) -> Option<&Value> { self.default.as_ref() } + + pub(crate) fn get_type_matched_default_val(&self) -> &Value { + &self.type_ + } } impl TryFrom<&yaml_rust::yaml::Hash> for Transform { @@ -192,6 +247,12 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform { TRANSFORM_DEFAULT => { default_opt = Some(Value::try_from(v)?); } + + TRANSFORM_ON_FAILURE => { + let on_failure = yaml_string(v, TRANSFORM_ON_FAILURE)?; + transform.with_on_failure(on_failure.parse()?); + } + _ => {} } } diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 6b077a22dc..49e008e438 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -16,7 +16,7 @@ use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; use crate::etl::transform::index::Index; -use crate::etl::transform::Transform; +use crate::etl::transform::{OnFailure, Transform}; use crate::etl::value::{Epoch, Time, Value}; impl TryFrom for ValueData { @@ -177,8 +177,20 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result Value::Boolean(_) => ValueData::BoolValue(b), Value::String(_) => ValueData::StringValue(b.to_string()), - Value::Time(_) => return Err("Boolean type not supported for Time".to_string()), - Value::Epoch(_) => return Err("Boolean type not supported for Epoch".to_string()), + Value::Time(_) => match transform.on_failure { + Some(OnFailure::Ignore) => return Ok(None), + Some(OnFailure::Default) => { + return Err("default value not supported for Time".to_string()) + } + None => return Err("Boolean type not supported for Time".to_string()), + }, + Value::Epoch(_) => match transform.on_failure { + Some(OnFailure::Ignore) => return Ok(None), + Some(OnFailure::Default) => { + return Err("default value not supported for Epoch".to_string()) + } + None => return Err("Boolean type not supported for Epoch".to_string()), + }, Value::Array(_) => unimplemented!("Array type not supported"), Value::Map(_) => unimplemented!("Object type not supported"), @@ -207,8 +219,21 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result, Value::Boolean(_) => ValueData::BoolValue(n != 0), Value::String(_) => ValueData::StringValue(n.to_string()), - Value::Time(_) => return Err("Integer type not supported for Time".to_string()), - Value::Epoch(_) => return Err("Integer type not supported for Epoch".to_string()), + Value::Time(_) => match transform.on_failure { + Some(OnFailure::Ignore) => return Ok(None), + Some(OnFailure::Default) => { + return Err("default value not supported for Time".to_string()) + } + None => return Err("Integer type not supported for Time".to_string()), + }, + + Value::Epoch(_) => match transform.on_failure { + Some(OnFailure::Ignore) => return Ok(None), + Some(OnFailure::Default) => { + return Err("default value not supported for Epoch".to_string()) + } + None => return Err("Integer type not supported for Epoch".to_string()), + }, Value::Array(_) => unimplemented!("Array type not supported"), Value::Map(_) => unimplemented!("Object type not supported"), @@ -237,8 +262,21 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result, Value::Boolean(_) => ValueData::BoolValue(n != 0), Value::String(_) => ValueData::StringValue(n.to_string()), - Value::Time(_) => return Err("Integer type not supported for Time".to_string()), - Value::Epoch(_) => return Err("Integer type not supported for Epoch".to_string()), + Value::Time(_) => match transform.on_failure { + Some(OnFailure::Ignore) => return Ok(None), + Some(OnFailure::Default) => { + return Err("default value not supported for Time".to_string()) + } + None => return Err("Integer type not supported for Time".to_string()), + }, + + Value::Epoch(_) => match transform.on_failure { + Some(OnFailure::Ignore) => return Ok(None), + Some(OnFailure::Default) => { + return Err("default value not supported for Epoch".to_string()) + } + None => return Err("Integer type not supported for Epoch".to_string()), + }, Value::Array(_) => unimplemented!("Array type not supported"), Value::Map(_) => unimplemented!("Object type not supported"), @@ -267,8 +305,21 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result, Value::Boolean(_) => ValueData::BoolValue(n != 0.0), Value::String(_) => ValueData::StringValue(n.to_string()), - Value::Time(_) => return Err("Float type not supported for Time".to_string()), - Value::Epoch(_) => return Err("Float type not supported for Epoch".to_string()), + Value::Time(_) => match transform.on_failure { + Some(OnFailure::Ignore) => return Ok(None), + Some(OnFailure::Default) => { + return Err("default value not supported for Time".to_string()) + } + None => return Err("Float type not supported for Time".to_string()), + }, + + Value::Epoch(_) => match transform.on_failure { + Some(OnFailure::Ignore) => return Ok(None), + Some(OnFailure::Default) => { + return Err("default value not supported for Epoch".to_string()) + } + None => return Err("Float type not supported for Epoch".to_string()), + }, Value::Array(_) => unimplemented!("Array type not supported"), Value::Map(_) => unimplemented!("Object type not supported"), @@ -280,31 +331,156 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result, } fn coerce_string_value(s: &str, transform: &Transform) -> Result, String> { - let val = match transform.type_ { - Value::Int8(_) => ValueData::I8Value(s.parse::().map_err(|e| e.to_string())?), - Value::Int16(_) => ValueData::I16Value(s.parse::().map_err(|e| e.to_string())?), - Value::Int32(_) => ValueData::I32Value(s.parse::().map_err(|e| e.to_string())?), - Value::Int64(_) => ValueData::I64Value(s.parse::().map_err(|e| e.to_string())?), + match transform.type_ { + Value::Int8(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::I8Value(s.parse().unwrap()))) + } + Value::Int16(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::I16Value(s.parse().unwrap()))) + } + Value::Int32(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::I32Value(s.parse().unwrap()))) + } + Value::Int64(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::I64Value(s.parse().unwrap()))) + } - Value::Uint8(_) => ValueData::U8Value(s.parse::().map_err(|e| e.to_string())?), - Value::Uint16(_) => ValueData::U16Value(s.parse::().map_err(|e| e.to_string())?), - Value::Uint32(_) => ValueData::U32Value(s.parse::().map_err(|e| e.to_string())?), - Value::Uint64(_) => ValueData::U64Value(s.parse::().map_err(|e| e.to_string())?), + Value::Uint8(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::U8Value(s.parse().unwrap()))) + } + Value::Uint16(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::U16Value(s.parse().unwrap()))) + } + Value::Uint32(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::U32Value(s.parse().unwrap()))) + } + Value::Uint64(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::U64Value(s.parse().unwrap()))) + } - Value::Float32(_) => ValueData::F32Value(s.parse::().map_err(|e| e.to_string())?), - Value::Float64(_) => ValueData::F64Value(s.parse::().map_err(|e| e.to_string())?), + Value::Float32(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::F32Value(s.parse().unwrap()))) + } + Value::Float64(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::F64Value(s.parse().unwrap()))) + } - Value::Boolean(_) => ValueData::BoolValue(s.parse::().map_err(|e| e.to_string())?), - Value::String(_) => ValueData::StringValue(s.to_string()), + Value::Boolean(_) if s.parse::().is_ok() => { + Ok(Some(ValueData::BoolValue(s.parse().unwrap()))) + } - Value::Time(_) => return Err("String type not supported for Time".to_string()), - Value::Epoch(_) => return Err("String type not supported for Epoch".to_string()), + // on_failure + Value::Int8(_) + | Value::Int16(_) + | Value::Int32(_) + | Value::Int64(_) + | Value::Uint8(_) + | Value::Uint16(_) + | Value::Uint32(_) + | Value::Uint64(_) + | Value::Float32(_) + | Value::Float64(_) + | Value::Boolean(_) => match transform.on_failure { + Some(OnFailure::Ignore) => Ok(None), + Some(OnFailure::Default) => match transform.get_default() { + Some(default) => coerce_value(default, transform), + None => coerce_value(transform.get_type_matched_default_val(), transform), + }, + None => Err(format!( + "failed to coerce string value '{s}' to type '{}'", + transform.type_.to_str_type() + )), + }, + + Value::String(_) => Ok(Some(ValueData::StringValue(s.to_string()))), + + Value::Time(_) => match transform.on_failure { + Some(OnFailure::Ignore) => Ok(None), + Some(OnFailure::Default) => Err("default value not supported for Time".to_string()), + None => Err("String type not supported for Time".to_string()), + }, + + Value::Epoch(_) => match transform.on_failure { + Some(OnFailure::Ignore) => Ok(None), + Some(OnFailure::Default) => Err("default value not supported for Epoch".to_string()), + None => Err("String type not supported for Epoch".to_string()), + }, Value::Array(_) => unimplemented!("Array type not supported"), Value::Map(_) => unimplemented!("Object type not supported"), - Value::Null => return Ok(None), - }; - - Ok(Some(val)) + Value::Null => Ok(None), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::etl::field::Fields; + + #[test] + fn test_coerce_string_without_on_failure() { + let transform = Transform { + fields: Fields::default(), + type_: Value::Int32(0), + default: None, + index: None, + on_failure: None, + }; + + // valid string + { + let val = Value::String("123".to_string()); + let result = coerce_value(&val, &transform).unwrap(); + assert_eq!(result, Some(ValueData::I32Value(123))); + } + + // invalid string + { + let val = Value::String("hello".to_string()); + let result = coerce_value(&val, &transform); + assert!(result.is_err()); + } + } + + #[test] + fn test_coerce_string_with_on_failure_ignore() { + let transform = Transform { + fields: Fields::default(), + type_: Value::Int32(0), + default: None, + index: None, + on_failure: Some(OnFailure::Ignore), + }; + + let val = Value::String("hello".to_string()); + let result = coerce_value(&val, &transform).unwrap(); + assert_eq!(result, None); + } + + #[test] + fn test_coerce_string_with_on_failure_default() { + let mut transform = Transform { + fields: Fields::default(), + type_: Value::Int32(0), + default: None, + index: None, + on_failure: Some(OnFailure::Default), + }; + + // with no explicit default value + { + let val = Value::String("hello".to_string()); + let result = coerce_value(&val, &transform).unwrap(); + assert_eq!(result, Some(ValueData::I32Value(0))); + } + + // with explicit default value + { + transform.default = Some(Value::Int32(42)); + let val = Value::String("hello".to_string()); + let result = coerce_value(&val, &transform).unwrap(); + assert_eq!(result, Some(ValueData::I32Value(42))); + } + } } diff --git a/src/pipeline/src/etl/transform/transformer/greptime/mod.rs b/src/pipeline/src/etl/transform/transformer/greptime/mod.rs index bbbfa0e910..6134657c44 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/mod.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/mod.rs @@ -47,6 +47,7 @@ impl GreptimeTransformer { type_, default, index: Some(Index::Timestamp), + on_failure: None, } } diff --git a/src/pipeline/src/etl/value/mod.rs b/src/pipeline/src/etl/value/mod.rs index a8daa5fa61..01b7c43fd9 100644 --- a/src/pipeline/src/etl/value/mod.rs +++ b/src/pipeline/src/etl/value/mod.rs @@ -193,6 +193,34 @@ impl Value { v => v.to_string(), } } + + pub fn to_str_type(&self) -> &str { + match self { + Value::Int8(_) => "int8", + Value::Int16(_) => "int16", + Value::Int32(_) => "int32", + Value::Int64(_) => "int64", + + Value::Uint8(_) => "uint8", + Value::Uint16(_) => "uint16", + Value::Uint32(_) => "uint32", + Value::Uint64(_) => "uint64", + + Value::Float32(_) => "float32", + Value::Float64(_) => "float64", + + Value::Boolean(_) => "boolean", + Value::String(_) => "string", + + Value::Time(_) => "time", + Value::Epoch(_) => "epoch", + + Value::Array(_) => "array", + Value::Map(_) => "map", + + Value::Null => "null", + } + } } impl std::fmt::Display for Value { diff --git a/src/pipeline/tests/on_failure.rs b/src/pipeline/tests/on_failure.rs new file mode 100644 index 0000000000..4934048e19 --- /dev/null +++ b/src/pipeline/tests/on_failure.rs @@ -0,0 +1,224 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use greptime_proto::v1::value::ValueData::{U16Value, U8Value}; +use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value}; + +#[test] +fn test_on_failure_with_ignore() { + let input_value_str = r#" + [ + { + "version": "-" + } + ] +"#; + let input_value: Value = serde_json::from_str::(input_value_str) + .expect("failed to parse input value") + .try_into() + .expect("failed to convert input value"); + + let pipeline_yaml = r#" +--- +description: Pipeline for Testing on-failure + +transform: + - fields: + - version + type: uint8 + on_failure: ignore +"#; + + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = + parse(&yaml_content).expect("failed to parse pipeline"); + let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + + let expected_schema = vec![ + ColumnSchema { + column_name: "version".to_string(), + datatype: ColumnDataType::Uint8.into(), + semantic_type: SemanticType::Field.into(), + datatype_extension: None, + }, + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + datatype_extension: None, + }, + ]; + + assert_eq!(output.schema, expected_schema); + assert_eq!(output.rows[0].values[0].value_data, None); +} + +#[test] +fn test_on_failure_with_default() { + let input_value_str = r#" + [ + { + "version": "-" + } + ] +"#; + let input_value: Value = serde_json::from_str::(input_value_str) + .expect("failed to parse input value") + .try_into() + .expect("failed to convert input value"); + + let pipeline_yaml = r#" +--- +description: Pipeline for Testing on-failure + +transform: + - fields: + - version + type: uint8 + default: 0 + on_failure: default +"#; + + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = + parse(&yaml_content).expect("failed to parse pipeline"); + let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + + let expected_schema = vec![ + ColumnSchema { + column_name: "version".to_string(), + datatype: ColumnDataType::Uint8.into(), + semantic_type: SemanticType::Field.into(), + datatype_extension: None, + }, + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + datatype_extension: None, + }, + ]; + + assert_eq!(output.schema, expected_schema); + assert_eq!(output.rows[0].values[0].value_data, Some(U8Value(0))); +} + +#[test] +fn test_default() { + let input_value_str = r#" + [{}] +"#; + let input_value: Value = serde_json::from_str::(input_value_str) + .expect("failed to parse input value") + .try_into() + .expect("failed to convert input value"); + + let pipeline_yaml = r#" +--- +description: Pipeline for Testing on-failure + +transform: + - fields: + - version + type: uint8 + default: 0 +"#; + + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = + parse(&yaml_content).expect("failed to parse pipeline"); + let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + + let expected_schema = vec![ + ColumnSchema { + column_name: "version".to_string(), + datatype: ColumnDataType::Uint8.into(), + semantic_type: SemanticType::Field.into(), + datatype_extension: None, + }, + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + datatype_extension: None, + }, + ]; + + assert_eq!(output.schema, expected_schema); + assert_eq!(output.rows[0].values[0].value_data, Some(U8Value(0))); +} + +#[test] +fn test_multiple_on_failure() { + let input_value_str = r#" + [ + { + "version": "-", + "spec_version": "-" + } + ] +"#; + let input_value: Value = serde_json::from_str::(input_value_str) + .expect("failed to parse input value") + .try_into() + .expect("failed to convert input value"); + + let pipeline_yaml = r#" +--- +description: Pipeline for Testing on-failure + +transform: + - fields: + - version + type: uint8 + default: 0 + on_failure: default + - fields: + - spec_version + type: uint16 + default: 0 + on_failure: default +"#; + + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = + parse(&yaml_content).expect("failed to parse pipeline"); + let output = pipeline.exec(input_value).expect("failed to exec pipeline"); + + let expected_schema = vec![ + ColumnSchema { + column_name: "version".to_string(), + datatype: ColumnDataType::Uint8.into(), + semantic_type: SemanticType::Field.into(), + datatype_extension: None, + }, + ColumnSchema { + column_name: "spec_version".to_string(), + datatype: ColumnDataType::Uint16.into(), + semantic_type: SemanticType::Field.into(), + datatype_extension: None, + }, + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + datatype_extension: None, + }, + ]; + + assert_eq!(output.schema, expected_schema); + assert_eq!(output.rows[0].values[0].value_data, Some(U8Value(0))); + assert_eq!(output.rows[0].values[1].value_data, Some(U16Value(0))); +} diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index 869bd13c78..ff9cad1bde 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -19,12 +19,8 @@ use greptime_proto::v1::value::ValueData::{ use greptime_proto::v1::Value as GreptimeValue; use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value}; -// use pipeline::transform::GreptimeTransformer; -// use pipeline::value::Value; -// use pipeline::{parse, Content, Pipeline}; - #[test] -fn main() { +fn test_complex_data() { let input_value_str = r#" [ {