diff --git a/src/pipeline/benches/processor.rs b/src/pipeline/benches/processor.rs index e9aefefb63..3f11cc39d1 100644 --- a/src/pipeline/benches/processor.rs +++ b/src/pipeline/benches/processor.rs @@ -12,13 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use criterion::{black_box, criterion_group, criterion_main, Criterion}; use pipeline::error::Result; -use pipeline::{json_to_map, parse, Content, Pipeline}; +use pipeline::{ + json_to_map, parse, setup_pipeline, Content, Pipeline, PipelineContext, SchemaInfo, +}; use serde_json::{Deserializer, Value}; fn processor_mut( - pipeline: &Pipeline, + pipeline: Arc, + pipeline_ctx: &PipelineContext<'_>, + schema_info: &mut SchemaInfo, input_values: Vec, ) -> Result> { let mut result = Vec::with_capacity(input_values.len()); @@ -26,7 +32,7 @@ fn processor_mut( for v in input_values { let payload = json_to_map(v).unwrap(); let r = pipeline - .exec_mut(payload)? + .exec_mut(payload, pipeline_ctx, schema_info)? .into_transformed() .expect("expect transformed result "); result.push(r.0); @@ -235,11 +241,25 @@ fn criterion_benchmark(c: &mut Criterion) { .collect::, _>>() .unwrap(); let pipeline = prepare_pipeline(); + + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + let mut group = c.benchmark_group("pipeline"); group.sample_size(50); group.bench_function("processor mut", |b| { b.iter(|| { - processor_mut(black_box(&pipeline), black_box(input_value.clone())).unwrap(); + processor_mut( + black_box(pipeline.clone()), + black_box(&pipeline_ctx), + black_box(&mut schema_info), + black_box(input_value.clone()), + ) + .unwrap(); }) }); group.finish(); diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index bd6bfa6847..1ded1064f9 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -235,14 +235,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Processor {processor}: invalid format {s}"))] - DateInvalidFormat { - s: String, - processor: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid Pattern: '{s}'. {detail}"))] DissectInvalidPattern { s: String, @@ -398,10 +390,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("Transform {fields:?} type MUST BE set before default {default}"))] + #[snafu(display("Transform fields must be set."))] + TransformFieldMustBeSet { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Transform {fields:?} type MUST BE set."))] TransformTypeMustBeSet { fields: String, - default: String, #[snafu(implicit)] location: Location, }, @@ -426,11 +422,17 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("Exactly one time-related processor and one timestamp value is required to use auto transform"))] + #[snafu(display("Exactly one time-related processor and one timestamp value is required to use auto transform. `ignore_missing` can not be set to true."))] AutoTransformOneTimestamp { #[snafu(implicit)] location: Location, }, + #[snafu(display("Invalid Pipeline doc version number: {}", version))] + InvalidVersionNumber { + version: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Null type not supported"))] CoerceUnsupportedNullType { #[snafu(implicit)] @@ -857,7 +859,6 @@ impl ErrorExt for Error { | DateParse { .. } | DateFailedToGetLocalTimezone { .. } | DateFailedToGetTimestamp { .. } - | DateInvalidFormat { .. } | DissectInvalidPattern { .. } | DissectEmptyPattern { .. } | DissectSplitExceedsInput { .. } @@ -883,11 +884,13 @@ impl ErrorExt for Error { | UrlEncodingDecode { .. } | TransformOnFailureInvalidValue { .. } | TransformElementMustBeMap { .. } + | TransformFieldMustBeSet { .. } | TransformTypeMustBeSet { .. } | TransformColumnNameMustBeUnique { .. } | TransformMultipleTimestampIndex { .. } | TransformTimestampIndexCount { .. } | AutoTransformOneTimestamp { .. } + | InvalidVersionNumber { .. } | CoerceUnsupportedNullType { .. } | CoerceUnsupportedNullTypeTo { .. } | CoerceUnsupportedEpochType { .. } diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index d2b48bc818..1d80d436aa 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -21,26 +21,27 @@ pub mod value; use std::collections::BTreeMap; -use ahash::{HashMap, HashMapExt}; use api::v1::Row; use common_time::timestamp::TimeUnit; +use itertools::Itertools; use processor::{Processor, Processors}; use snafu::{ensure, OptionExt, ResultExt}; use transform::Transforms; use value::Value; -use yaml_rust::YamlLoader; +use yaml_rust::{Yaml, YamlLoader}; use crate::dispatcher::{Dispatcher, Rule}; use crate::error::{ - AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result, - ValueMustBeMapSnafu, YamlLoadSnafu, YamlParseSnafu, + AutoTransformOneTimestampSnafu, Error, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, + InvalidVersionNumberSnafu, Result, YamlLoadSnafu, YamlParseSnafu, }; -use crate::etl::ctx_req::TABLE_SUFFIX_KEY; use crate::etl::processor::ProcessorKind; +use crate::etl::transform::transformer::greptime::values_to_row; use crate::tablesuffix::TableSuffixTemplate; -use crate::{ContextOpt, GreptimeTransformer}; +use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo}; const DESCRIPTION: &str = "description"; +const DOC_VERSION: &str = "version"; const PROCESSORS: &str = "processors"; const TRANSFORM: &str = "transform"; const TRANSFORMS: &str = "transforms"; @@ -63,6 +64,8 @@ pub fn parse(input: &Content) -> Result { let description = doc[DESCRIPTION].as_str().map(|s| s.to_string()); + let doc_version = (&doc[DOC_VERSION]).try_into()?; + let processors = if let Some(v) = doc[PROCESSORS].as_vec() { v.try_into()? } else { @@ -82,16 +85,31 @@ pub fn parse(input: &Content) -> Result { let cnt = processors .iter() .filter_map(|p| match p { - ProcessorKind::Date(d) => Some(d.target_count()), - ProcessorKind::Timestamp(t) => Some(t.target_count()), - ProcessorKind::Epoch(e) => Some(e.target_count()), + ProcessorKind::Date(d) if !d.ignore_missing() => Some( + d.fields + .iter() + .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond)) + .collect_vec(), + ), + ProcessorKind::Epoch(e) if !e.ignore_missing() => Some( + e.fields + .iter() + .map(|f| (f.target_or_input_field(), (&e.resolution).into())) + .collect_vec(), + ), _ => None, }) - .sum::(); - ensure!(cnt == 1, AutoTransformOneTimestampSnafu); - None + .flatten() + .collect_vec(); + ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu); + + let (ts_name, timeunit) = cnt.first().unwrap(); + TransformerMode::AutoTransform(ts_name.to_string(), *timeunit) } else { - Some(GreptimeTransformer::new(transformers)?) + TransformerMode::GreptimeTransformer(GreptimeTransformer::new( + transformers, + &doc_version, + )?) }; let dispatcher = if !doc[DISPATCHER].is_badvalue() { @@ -107,6 +125,7 @@ pub fn parse(input: &Content) -> Result { }; Ok(Pipeline { + doc_version, description, processors, transformer, @@ -118,15 +137,70 @@ pub fn parse(input: &Content) -> Result { } } +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] +pub enum PipelineDocVersion { + /// 1. All fields meant to be preserved have to explicitly set in the transform section. + /// 2. Or no transform is set, then the auto-transform will be used. + #[default] + V1, + + /// A combination of transform and auto-transform. + /// First it goes through the transform section, + /// then use auto-transform to set the rest fields. + /// + /// This is useful if you only want to set the index field, + /// and let the normal fields be auto-inferred. + V2, +} + +impl TryFrom<&Yaml> for PipelineDocVersion { + type Error = Error; + + fn try_from(value: &Yaml) -> Result { + if value.is_badvalue() || value.is_null() { + return Ok(PipelineDocVersion::V1); + } + + let version = match value { + Yaml::String(s) => s + .parse::() + .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?, + Yaml::Integer(i) => *i, + _ => { + return InvalidVersionNumberSnafu { + version: value.as_str().unwrap_or_default().to_string(), + } + .fail(); + } + }; + + match version { + 1 => Ok(PipelineDocVersion::V1), + 2 => Ok(PipelineDocVersion::V2), + _ => InvalidVersionNumberSnafu { + version: version.to_string(), + } + .fail(), + } + } +} + #[derive(Debug)] pub struct Pipeline { + doc_version: PipelineDocVersion, description: Option, processors: processor::Processors, dispatcher: Option, - transformer: Option, + transformer: TransformerMode, tablesuffix: Option, } +#[derive(Debug, Clone)] +pub enum TransformerMode { + GreptimeTransformer(GreptimeTransformer), + AutoTransform(String, TimeUnit), +} + /// Where the pipeline executed is dispatched to, with context information #[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)] pub struct DispatchedTo { @@ -154,7 +228,6 @@ impl DispatchedTo { #[derive(Debug)] pub enum PipelineExecOutput { Transformed(TransformedOutput), - AutoTransform(AutoTransformOutput), DispatchedTo(DispatchedTo, Value), } @@ -163,15 +236,6 @@ pub struct TransformedOutput { pub opt: ContextOpt, pub row: Row, pub table_suffix: Option, - pub pipeline_map: Value, -} - -#[derive(Debug)] -pub struct AutoTransformOutput { - pub table_suffix: Option, - // ts_column_name -> unit - pub ts_unit_map: HashMap, - pub pipeline_map: Value, } impl PipelineExecOutput { @@ -232,7 +296,16 @@ pub fn simd_json_array_to_map(val: Vec) -> Result Result { + fn is_v1(&self) -> bool { + self.doc_version == PipelineDocVersion::V1 + } + + pub fn exec_mut( + &self, + mut val: Value, + pipeline_ctx: &PipelineContext<'_>, + schema_info: &mut SchemaInfo, + ) -> Result { // process for processor in self.processors.iter() { val = processor.exec_mut(val)?; @@ -243,51 +316,62 @@ impl Pipeline { return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val)); } - // do transform - if let Some(transformer) = self.transformer() { - let (mut opt, row) = transformer.transform_mut(&mut val)?; - let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val); + // extract the options first + // this might be a breaking change, for table_suffix is now right after the processors + let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?; + let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val); - Ok(PipelineExecOutput::Transformed(TransformedOutput { - opt, - row, - table_suffix, - pipeline_map: val, - })) - } else { - // check table suffix var - let table_suffix = val - .remove(TABLE_SUFFIX_KEY) - .map(|f| f.to_str_value()) - .or_else(|| self.tablesuffix.as_ref().and_then(|t| t.apply(&val))); - - let mut ts_unit_map = HashMap::with_capacity(4); - // get all ts values - for (k, v) in val.as_map_mut().context(ValueMustBeMapSnafu)? { - if let Value::Timestamp(ts) = v { - if !ts_unit_map.contains_key(k) { - ts_unit_map.insert(k.clone(), ts.get_unit()); - } + let row = match self.transformer() { + TransformerMode::GreptimeTransformer(greptime_transformer) => { + let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?; + if self.is_v1() { + // v1 dont combine with auto-transform + // so return immediately + return Ok(PipelineExecOutput::Transformed(TransformedOutput { + opt, + row: Row { values }, + table_suffix, + })); } + // continue v2 process, check ts column and set the rest fields with auto-transform + // if transformer presents, then ts has been set + values_to_row(schema_info, val, pipeline_ctx, Some(values))? } - Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput { - table_suffix, - ts_unit_map, - pipeline_map: val, - })) - } + TransformerMode::AutoTransform(ts_name, time_unit) => { + // infer ts from the context + // we've check that only one timestamp should exist + + // Create pipeline context with the found timestamp + let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some( + IdentityTimeIndex::Epoch(ts_name.to_string(), *time_unit, false), + )); + let n_ctx = + PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel); + values_to_row(schema_info, val, &n_ctx, None)? + } + }; + + Ok(PipelineExecOutput::Transformed(TransformedOutput { + opt, + row, + table_suffix, + })) } pub fn processors(&self) -> &processor::Processors { &self.processors } - pub fn transformer(&self) -> Option<&GreptimeTransformer> { - self.transformer.as_ref() + pub fn transformer(&self) -> &TransformerMode { + &self.transformer } + // the method is for test purpose pub fn schemas(&self) -> Option<&Vec> { - self.transformer.as_ref().map(|t| t.schemas()) + match &self.transformer { + TransformerMode::GreptimeTransformer(t) => Some(t.schemas()), + TransformerMode::AutoTransform(_, _) => None, + } } } @@ -298,8 +382,35 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str .context(IntermediateKeyIndexSnafu { kind, key }) } +/// This macro is test only, do not use it in production. +/// The schema_info cannot be used in auto-transform ts-infer mode for lacking the ts schema. +/// +/// Usage: +/// ```rust +/// let (pipeline, schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); +/// let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_param, Channel::Unknown); +/// ``` +#[macro_export] +macro_rules! setup_pipeline { + ($pipeline:expr) => {{ + use std::sync::Arc; + + use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo}; + + let pipeline: Arc = Arc::new($pipeline); + let schema = pipeline.schemas().unwrap(); + let schema_info = SchemaInfo::from_schema_list(schema.clone()); + + let pipeline_def = PipelineDefinition::Resolved(pipeline.clone()); + let pipeline_param = GreptimePipelineParams::default(); + + (pipeline, schema_info, pipeline_def, pipeline_param) + }}; +} #[cfg(test)] mod tests { + use std::sync::Arc; + use api::v1::Rows; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{self, ColumnDataType, SemanticType}; @@ -311,7 +422,8 @@ mod tests { let input_value_str = r#" { "my_field": "1,2", - "foo": "bar" + "foo": "bar", + "ts": "1" } "#; let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); @@ -321,16 +433,30 @@ processors: - csv: field: my_field target_fields: field1, field2 + - epoch: + field: ts + resolution: ns transform: - field: field1 type: uint32 - field: field2 type: uint32 + - field: ts + type: timestamp, ns + index: time "#; + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + let payload = json_to_map(input_value).unwrap(); let result = pipeline - .exec_mut(payload) + .exec_mut(payload, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .unwrap(); @@ -339,7 +465,7 @@ transform: assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2))); match &result.0.values[2].value_data { Some(ValueData::TimestampNanosecondValue(v)) => { - assert_ne!(*v, 0); + assert_ne!(v, &0); } _ => panic!("expect null value"), } @@ -354,7 +480,7 @@ transform: - message patterns: - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}" - - timestamp: + - date: fields: - ts formats: @@ -378,17 +504,31 @@ transform: type: timestamp, ns index: time"#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap(); + let pipeline = Arc::new(pipeline); + let schema = pipeline.schemas().unwrap(); + let mut schema_info = SchemaInfo::from_schema_list(schema.clone()); + + let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone()); + let pipeline_param = crate::GreptimePipelineParams::default(); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let mut payload = BTreeMap::new(); payload.insert("message".to_string(), Value::String(message)); let payload = Value::Map(payload.into()); + let result = pipeline - .exec_mut(payload) + .exec_mut(payload, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .unwrap(); - let sechema = pipeline.schemas().unwrap(); - assert_eq!(sechema.len(), result.0.values.len()); + // println!("[DEBUG]schema_info: {:?}", schema_info.schema); + // println!("[DEBUG]re: {:?}", result.0.values); + + assert_eq!(schema_info.schema.len(), result.0.values.len()); let test = vec![ ( ColumnDataType::String as i32, @@ -425,8 +565,10 @@ transform: Some(ValueData::TimestampNanosecondValue(1722493367000000000)), ), ]; - for i in 0..sechema.len() { - let schema = &sechema[i]; + // manually set schema + let schema = pipeline.schemas().unwrap(); + for i in 0..schema.len() { + let schema = &schema[i]; let value = &result.0.values[i]; assert_eq!(schema.datatype, test[i].0); assert_eq!(value.value_data, test[i].1); @@ -438,7 +580,8 @@ transform: let input_value_str = r#" { "my_field": "1,2", - "foo": "bar" + "foo": "bar", + "ts": "1" } "#; let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); @@ -449,17 +592,30 @@ transform: - csv: field: my_field target_fields: field1, field2 + - epoch: + field: ts + resolution: ns transform: - field: field1 type: uint32 - field: field2 type: uint32 + - field: ts + type: timestamp, ns + index: time "#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + let payload = json_to_map(input_value).unwrap(); let result = pipeline - .exec_mut(payload) + .exec_mut(payload, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .unwrap(); @@ -467,7 +623,7 @@ transform: assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2))); match &result.0.values[2].value_data { Some(ValueData::TimestampNanosecondValue(v)) => { - assert_ne!(*v, 0); + assert_ne!(v, &0); } _ => panic!("expect null value"), } @@ -488,7 +644,7 @@ transform: description: Pipeline for Apache Tomcat processors: - - timestamp: + - date: field: test_time transform: @@ -498,11 +654,22 @@ transform: "#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let pipeline = Arc::new(pipeline); + let schema = pipeline.schemas().unwrap(); + let mut schema_info = SchemaInfo::from_schema_list(schema.clone()); + + let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone()); + let pipeline_param = crate::GreptimePipelineParams::default(); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let schema = pipeline.schemas().unwrap().clone(); let result = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(result) + .exec_mut(result, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .unwrap(); @@ -536,6 +703,9 @@ transform: description: Pipeline for Apache Tomcat processors: + - epoch: + field: ts + resolution: ns dispatcher: field: typename @@ -549,7 +719,9 @@ dispatcher: transform: - field: typename type: string - + - field: ts + type: timestamp, ns + index: time "#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); let dispatcher = pipeline.dispatcher.expect("expect dispatcher"); @@ -580,6 +752,9 @@ transform: description: Pipeline for Apache Tomcat processors: + - epoch: + field: ts + resolution: ns dispatcher: _field: typename @@ -593,14 +768,18 @@ dispatcher: transform: - field: typename type: string - + - field: ts + type: timestamp, ns + index: time "#; let bad_yaml2 = r#" --- description: Pipeline for Apache Tomcat processors: - + - epoch: + field: ts + resolution: ns dispatcher: field: typename rules: @@ -613,14 +792,18 @@ dispatcher: transform: - field: typename type: string - + - field: ts + type: timestamp, ns + index: time "#; let bad_yaml3 = r#" --- description: Pipeline for Apache Tomcat processors: - + - epoch: + field: ts + resolution: ns dispatcher: field: typename rules: @@ -633,7 +816,9 @@ dispatcher: transform: - field: typename type: string - + - field: ts + type: timestamp, ns + index: time "#; let r: Result = parse(&Content::Yaml(bad_yaml1)); diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index dfee5c03b0..9d3e1d5c0f 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -27,7 +27,6 @@ pub mod letter; pub mod regex; pub mod select; pub mod simple_extract; -pub mod timestamp; pub mod urlencoding; pub mod vrl; @@ -47,7 +46,6 @@ use json_path::JsonPathProcessor; use letter::LetterProcessor; use regex::RegexProcessor; use snafu::{OptionExt, ResultExt}; -use timestamp::TimestampProcessor; use urlencoding::UrlEncodingProcessor; use crate::error::{ @@ -138,7 +136,6 @@ pub enum ProcessorKind { Join(JoinProcessor), Letter(LetterProcessor), Regex(RegexProcessor), - Timestamp(TimestampProcessor), UrlEncoding(UrlEncodingProcessor), Epoch(EpochProcessor), Date(DateProcessor), @@ -211,9 +208,6 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?), letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?), regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?), - timestamp::PROCESSOR_TIMESTAMP => { - ProcessorKind::Timestamp(TimestampProcessor::try_from(value)?) - } urlencoding::PROCESSOR_URL_ENCODING => { ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?) } diff --git a/src/pipeline/src/etl/processor/date.rs b/src/pipeline/src/etl/processor/date.rs index 102c3fb7f2..e60107a064 100644 --- a/src/pipeline/src/etl/processor/date.rs +++ b/src/pipeline/src/etl/processor/date.rs @@ -148,7 +148,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor { /// Reserved for compatibility only #[derive(Debug, Default)] pub struct DateProcessor { - fields: Fields, + pub(crate) fields: Fields, formats: Formats, timezone: Option>, locale: Option>, // to support locale @@ -162,10 +162,6 @@ pub struct DateProcessor { } impl DateProcessor { - pub(crate) fn target_count(&self) -> usize { - self.fields.len() - } - fn parse(&self, val: &str) -> Result { let mut tz = Tz::UTC; if let Some(timezone) = &self.timezone { diff --git a/src/pipeline/src/etl/processor/epoch.rs b/src/pipeline/src/etl/processor/epoch.rs index 52be05da82..5d70483920 100644 --- a/src/pipeline/src/etl/processor/epoch.rs +++ b/src/pipeline/src/etl/processor/epoch.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_time::timestamp::TimeUnit; use snafu::{OptionExt, ResultExt}; use crate::error::{ @@ -34,7 +35,7 @@ pub(crate) const PROCESSOR_EPOCH: &str = "epoch"; const RESOLUTION_NAME: &str = "resolution"; #[derive(Debug, Default)] -enum Resolution { +pub(crate) enum Resolution { Second, #[default] Milli, @@ -56,13 +57,24 @@ impl TryFrom<&str> for Resolution { } } +impl From<&Resolution> for TimeUnit { + fn from(resolution: &Resolution) -> Self { + match resolution { + Resolution::Second => TimeUnit::Second, + Resolution::Milli => TimeUnit::Millisecond, + Resolution::Micro => TimeUnit::Microsecond, + Resolution::Nano => TimeUnit::Nanosecond, + } + } +} + /// support string, integer, float, time, epoch /// deprecated it should be removed in the future /// Reserved for compatibility only #[derive(Debug, Default)] pub struct EpochProcessor { - fields: Fields, - resolution: Resolution, + pub(crate) fields: Fields, + pub(crate) resolution: Resolution, ignore_missing: bool, // description // if @@ -110,10 +122,6 @@ impl EpochProcessor { Resolution::Nano => Ok(Timestamp::Nanosecond(t)), } } - - pub(crate) fn target_count(&self) -> usize { - self.fields.len() - } } impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor { diff --git a/src/pipeline/src/etl/processor/timestamp.rs b/src/pipeline/src/etl/processor/timestamp.rs deleted file mode 100644 index 4ea79b0356..0000000000 --- a/src/pipeline/src/etl/processor/timestamp.rs +++ /dev/null @@ -1,418 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use chrono::{DateTime, NaiveDateTime}; -use chrono_tz::Tz; -use lazy_static::lazy_static; -use snafu::{OptionExt, ResultExt}; - -use crate::error::{ - DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu, - DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error, - KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, - ProcessorUnsupportedValueSnafu, Result, -}; -use crate::etl::field::Fields; -use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME, - FIELD_NAME, IGNORE_MISSING_NAME, -}; -use crate::etl::value::time::{ - MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION, - MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION, - SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION, -}; -use crate::etl::value::{Timestamp, Value}; - -pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp"; -const RESOLUTION_NAME: &str = "resolution"; -const FORMATS_NAME: &str = "formats"; // default RFC3339 - -lazy_static! { - static ref DEFAULT_FORMATS: Vec<(Arc,Tz)> = vec![ - // timezone with colon - "%Y-%m-%dT%H:%M:%S%:z", - "%Y-%m-%dT%H:%M:%S%.3f%:z", - "%Y-%m-%dT%H:%M:%S%.6f%:z", - "%Y-%m-%dT%H:%M:%S%.9f%:z", - // timezone without colon - "%Y-%m-%dT%H:%M:%S%z", - "%Y-%m-%dT%H:%M:%S%.3f%z", - "%Y-%m-%dT%H:%M:%S%.6f%z", - "%Y-%m-%dT%H:%M:%S%.9f%z", - // without timezone - "%Y-%m-%dT%H:%M:%SZ", - "%Y-%m-%dT%H:%M:%S", - "%Y-%m-%dT%H:%M:%S%.3f", - "%Y-%m-%dT%H:%M:%S%.6f", - "%Y-%m-%dT%H:%M:%S%.9f", - ] - .iter() - .map(|s| (Arc::new(s.to_string()),Tz::UCT)) - .collect(); -} - -#[derive(Debug, Default)] -enum Resolution { - Second, - #[default] - Milli, - Micro, - Nano, -} - -impl TryFrom<&str> for Resolution { - type Error = Error; - - fn try_from(s: &str) -> Result { - match s { - SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second), - MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli), - MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro), - NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano), - _ => EpochInvalidResolutionSnafu { resolution: s }.fail(), - } - } -} - -#[derive(Debug)] -struct Formats(Vec<(Arc, Tz)>); - -impl Formats { - fn new(mut formats: Vec<(Arc, Tz)>) -> Self { - formats.sort_by_key(|(key, _)| key.clone()); - formats.dedup(); - Formats(formats) - } -} - -impl Default for Formats { - fn default() -> Self { - Formats(DEFAULT_FORMATS.clone()) - } -} - -impl std::ops::Deref for Formats { - type Target = Vec<(Arc, Tz)>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// support string, integer, float, time, epoch -#[derive(Debug, Default)] -pub struct TimestampProcessor { - fields: Fields, - formats: Formats, - resolution: Resolution, - ignore_missing: bool, - // description - // if - // ignore_failure - // on_failure - // tag -} - -impl TimestampProcessor { - /// try to parse val with timezone first, if failed, parse without timezone - fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result { - if let Ok(dt) = DateTime::parse_from_str(val, fmt) { - Ok(dt - .timestamp_nanos_opt() - .context(DateFailedToGetTimestampSnafu)?) - } else { - let dt = NaiveDateTime::parse_from_str(val, fmt) - .context(DateParseSnafu { value: val })? - .and_local_timezone(tz) - .single() - .context(DateFailedToGetLocalTimezoneSnafu)?; - Ok(dt - .timestamp_nanos_opt() - .context(DateFailedToGetTimestampSnafu)?) - } - } - - fn parse_time_str(&self, val: &str) -> Result { - for (fmt, tz) in self.formats.iter() { - if let Ok(ns) = Self::try_parse(val, fmt, *tz) { - return Ok(ns); - } - } - ProcessorFailedToParseStringSnafu { - kind: PROCESSOR_TIMESTAMP, - value: val.to_string(), - } - .fail() - } - - fn parse(&self, val: &Value) -> Result { - let t: i64 = match val { - Value::String(s) => { - let t = s.parse::(); - match t { - Ok(t) => t, - Err(_) => { - let ns = self.parse_time_str(s)?; - return Ok(Timestamp::Nanosecond(ns)); - } - } - } - Value::Int16(i) => *i as i64, - Value::Int32(i) => *i as i64, - Value::Int64(i) => *i, - Value::Uint8(i) => *i as i64, - Value::Uint16(i) => *i as i64, - Value::Uint32(i) => *i as i64, - Value::Uint64(i) => *i as i64, - Value::Float32(f) => *f as i64, - Value::Float64(f) => *f as i64, - - Value::Timestamp(e) => match self.resolution { - Resolution::Second => e.timestamp(), - Resolution::Milli => e.timestamp_millis(), - Resolution::Micro => e.timestamp_micros(), - Resolution::Nano => e.timestamp_nanos(), - }, - - _ => { - return ProcessorUnsupportedValueSnafu { - processor: PROCESSOR_TIMESTAMP, - val: val.to_string(), - } - .fail(); - } - }; - - match self.resolution { - Resolution::Second => Ok(Timestamp::Second(t)), - Resolution::Milli => Ok(Timestamp::Millisecond(t)), - Resolution::Micro => Ok(Timestamp::Microsecond(t)), - Resolution::Nano => Ok(Timestamp::Nanosecond(t)), - } - } - - pub(crate) fn target_count(&self) -> usize { - self.fields.len() - } -} - -fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result, Tz)>> { - match yaml.as_vec() { - Some(formats_yaml) => { - let mut formats = Vec::with_capacity(formats_yaml.len()); - for v in formats_yaml { - let s = yaml_strings(v, FORMATS_NAME) - .or(yaml_string(v, FORMATS_NAME).map(|s| vec![s]))?; - if s.len() != 1 && s.len() != 2 { - return DateInvalidFormatSnafu { - processor: PROCESSOR_TIMESTAMP, - s: format!("{s:?}"), - } - .fail(); - } - let mut iter = s.into_iter(); - // safety: unwrap is safe here - let formatter = iter.next().unwrap(); - let tz = iter - .next() - .map(|tz| { - tz.parse::() - .context(DateParseTimezoneSnafu { value: tz }) - }) - .unwrap_or(Ok(Tz::UTC))?; - formats.push((Arc::new(formatter), tz)); - } - Ok(formats) - } - None => DateInvalidFormatSnafu { - processor: PROCESSOR_TIMESTAMP, - s: format!("{yaml:?}"), - } - .fail(), - } -} - -impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessor { - type Error = Error; - - fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { - let mut fields = Fields::default(); - let mut formats = Formats::default(); - let mut resolution = Resolution::default(); - let mut ignore_missing = false; - - for (k, v) in hash { - let key = k - .as_str() - .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; - - match key { - FIELD_NAME => { - fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); - } - FIELDS_NAME => { - fields = yaml_new_fields(v, FIELDS_NAME)?; - } - FORMATS_NAME => { - let formats_vec = parse_formats(v)?; - formats = Formats::new(formats_vec); - } - RESOLUTION_NAME => { - resolution = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?; - } - IGNORE_MISSING_NAME => { - ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?; - } - _ => {} - } - } - - let processor_builder = TimestampProcessor { - fields, - formats, - resolution, - ignore_missing, - }; - - Ok(processor_builder) - } -} - -impl Processor for TimestampProcessor { - fn kind(&self) -> &str { - PROCESSOR_TIMESTAMP - } - - fn ignore_missing(&self) -> bool { - self.ignore_missing - } - - fn exec_mut(&self, mut val: Value) -> Result { - for field in self.fields.iter() { - let index = field.input_field(); - match val.get(index) { - Some(Value::Null) | None => { - if !self.ignore_missing { - return ProcessorMissingFieldSnafu { - processor: self.kind(), - field: field.input_field(), - } - .fail(); - } - } - Some(v) => { - let result = self.parse(v)?; - let output_key = field.target_or_input_field(); - val.insert(output_key.to_string(), Value::Timestamp(result))?; - } - } - } - Ok(val) - } -} - -#[cfg(test)] -mod tests { - use yaml_rust::YamlLoader; - - use super::TimestampProcessor; - use crate::etl::value::{Timestamp, Value}; - - #[test] - fn test_parse_epoch() { - let processor_yaml_str = r#"fields: - - hello -resolution: s -formats: - - "%Y-%m-%dT%H:%M:%S%:z" - - "%Y-%m-%dT%H:%M:%S%.3f%:z" - - "%Y-%m-%dT%H:%M:%S" - - "%Y-%m-%dT%H:%M:%SZ" -"#; - let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0]; - let timestamp_yaml = yaml.as_hash().unwrap(); - let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap(); - - let values = [ - ( - Value::String("1573840000".into()), - Timestamp::Second(1573840000), - ), - (Value::Int32(1573840001), Timestamp::Second(1573840001)), - (Value::Uint64(1573840002), Timestamp::Second(1573840002)), - // float32 has a problem expressing the timestamp. - // 1573840003.0_f32 as i64 is 1573840000 - //(Value::Float32(1573840003.0), Epoch::Second(1573840003)), - ( - Value::String("2019-11-15T17:46:40Z".into()), - Timestamp::Nanosecond(1573840000000000000), - ), - ]; - - for (value, result) in values { - let parsed = processor.parse(&value).unwrap(); - assert_eq!(parsed, result); - } - let values: Vec<&str> = vec![ - "2014-5-17T12:34:56", - "2014-5-17T12:34:56Z", - "2014-5-17T12:34:56+09:30", - "2014-5-17T12:34:56.000+09:30", - "2014-5-17T12:34:56-0930", - "2014-5-17T12:34:56.000-0930", - ] - .into_iter() - .collect(); - - for value in values { - let parsed = processor.parse(&Value::String(value.into())); - assert!(parsed.is_ok()); - } - } - - #[test] - fn test_parse_with_timezone() { - let processor_yaml_str = r#"fields: - - hello -resolution: s -formats: - - ["%Y-%m-%dT%H:%M:%S%:z", "Asia/Tokyo"] - - ["%Y-%m-%dT%H:%M:%S%.3f%:z", "Asia/Tokyo"] - - ["%Y-%m-%dT%H:%M:%S", "Asia/Tokyo"] - - ["%Y-%m-%dT%H:%M:%SZ", "Asia/Tokyo"] -"#; - let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0]; - let timestamp_yaml = yaml.as_hash().unwrap(); - let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap(); - - let values: Vec<&str> = vec![ - "2014-5-17T12:34:56", - "2014-5-17T12:34:56Z", - "2014-5-17T12:34:56+09:30", - "2014-5-17T12:34:56.000+09:30", - "2014-5-17T12:34:56-0930", - "2014-5-17T12:34:56.000-0930", - ] - .into_iter() - .collect(); - - for value in values { - let parsed = processor.parse(&Value::String(value.into())); - assert!(parsed.is_ok()); - } - } -} diff --git a/src/pipeline/src/etl/transform.rs b/src/pipeline/src/etl/transform.rs index 7275481689..b65fb853b7 100644 --- a/src/pipeline/src/etl/transform.rs +++ b/src/pipeline/src/etl/transform.rs @@ -15,11 +15,11 @@ pub mod index; pub mod transformer; -use snafu::OptionExt; +use snafu::{ensure, OptionExt}; use crate::error::{ Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu, - TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu, + TransformFieldMustBeSetSnafu, TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu, }; use crate::etl::field::Fields; use crate::etl::processor::{yaml_bool, yaml_new_field, yaml_new_fields, yaml_string}; @@ -216,25 +216,30 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform { _ => {} } } - let mut final_default = None; - if let Some(default_value) = default { - match (&type_, &default_value) { - (Value::Null, _) => { - return TransformTypeMustBeSetSnafu { - fields: format!("{:?}", fields), - default: default_value.to_string(), - } - .fail(); - } - (_, Value::Null) => {} // if default is not set, then it will be regarded as default null - (_, _) => { + // ensure fields and type + ensure!(!fields.is_empty(), TransformFieldMustBeSetSnafu); + ensure!( + type_ != Value::Null, + TransformTypeMustBeSetSnafu { + fields: format!("{:?}", fields) + } + ); + + let final_default = if let Some(default_value) = default { + match default_value { + // if default is not set, then it will be regarded as default null + Value::Null => None, + _ => { let target = type_.parse_str_value(default_value.to_str_value().as_str())?; - final_default = Some(target); on_failure = Some(OnFailure::Default); + Some(target) } } - } + } else { + None + }; + let builder = Transform { fields, type_, diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 48f5cb4793..8494d638a0 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -42,6 +42,7 @@ use crate::etl::field::{Field, Fields}; use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transforms}; use crate::etl::value::{Timestamp, Value}; +use crate::etl::PipelineDocVersion; use crate::{unwrap_or_continue_if_err, Map, PipelineContext}; const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp"; @@ -160,7 +161,7 @@ impl GreptimeTransformer { } impl GreptimeTransformer { - pub fn new(mut transforms: Transforms) -> Result { + pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result { // empty check is done in the caller let mut column_names_set = HashSet::new(); let mut timestamp_columns = vec![]; @@ -202,34 +203,34 @@ impl GreptimeTransformer { } } - match timestamp_columns.len() { - 0 => { + let schema = match timestamp_columns.len() { + 0 if doc_version == &PipelineDocVersion::V1 => { + // compatible with v1, add a default timestamp column GreptimeTransformer::add_greptime_timestamp_column(&mut transforms); - - let schema = GreptimeTransformer::init_schemas(&transforms)?; - Ok(GreptimeTransformer { transforms, schema }) + GreptimeTransformer::init_schemas(&transforms)? } - 1 => { - let schema = GreptimeTransformer::init_schemas(&transforms)?; - Ok(GreptimeTransformer { transforms, schema }) + 1 => GreptimeTransformer::init_schemas(&transforms)?, + count => { + let columns = timestamp_columns.iter().join(", "); + return TransformTimestampIndexCountSnafu { count, columns }.fail(); } - _ => { - let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", "); - let count = timestamp_columns.len(); - TransformTimestampIndexCountSnafu { count, columns }.fail() - } - } + }; + Ok(GreptimeTransformer { transforms, schema }) } - pub fn transform_mut(&self, pipeline_map: &mut Value) -> Result<(ContextOpt, Row)> { - let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map)?; - + pub fn transform_mut( + &self, + pipeline_map: &mut Value, + is_v1: bool, + ) -> Result> { let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()]; let mut output_index = 0; for transform in self.transforms.iter() { for field in transform.fields.iter() { - let index = field.input_field(); - match pipeline_map.get(index) { + let column_name = field.input_field(); + + // let keep us `get` here to be compatible with v1 + match pipeline_map.get(column_name) { Some(v) => { let value_data = coerce_value(v, transform)?; // every transform fields has only one output field @@ -256,9 +257,14 @@ impl GreptimeTransformer { } } output_index += 1; + if !is_v1 { + // remove the column from the pipeline_map + // so that the auto-transform can use the rest fields + pipeline_map.remove(column_name); + } } } - Ok((opt, Row { values })) + Ok(values) } pub fn transforms(&self) -> &Transforms { @@ -292,6 +298,17 @@ impl SchemaInfo { index: HashMap::with_capacity(capacity), } } + + pub fn from_schema_list(schema_list: Vec) -> Self { + let mut index = HashMap::new(); + for (i, schema) in schema_list.iter().enumerate() { + index.insert(schema.column_name.clone(), i); + } + Self { + schema: schema_list, + index, + } + } } fn resolve_schema( @@ -398,12 +415,14 @@ fn calc_ts(p_ctx: &PipelineContext, values: &Value) -> Result> } } -fn values_to_row( +pub(crate) fn values_to_row( schema_info: &mut SchemaInfo, values: Value, pipeline_ctx: &PipelineContext<'_>, + row: Option>, ) -> Result { - let mut row: Vec = Vec::with_capacity(schema_info.schema.len()); + let mut row: Vec = + row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len())); let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts(); // calculate timestamp value based on the channel @@ -411,9 +430,7 @@ fn values_to_row( row.push(GreptimeValue { value_data: ts }); - for _ in 1..schema_info.schema.len() { - row.push(GreptimeValue { value_data: None }); - } + row.resize(schema_info.schema.len(), GreptimeValue { value_data: None }); // skip ts column let ts_column_name = custom_ts @@ -591,7 +608,7 @@ fn identity_pipeline_inner( skip_error ); let row = unwrap_or_continue_if_err!( - values_to_row(&mut schema_info, pipeline_map, pipeline_ctx), + values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None), skip_error ); diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index ccc07e929f..f5904ed31e 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -330,6 +330,13 @@ impl Value { } } + pub fn as_map(&self) -> Option<&BTreeMap> { + match self { + Value::Map(map) => Some(map), + _ => None, + } + } + pub fn into_map(self) -> Option> { match self { Value::Map(map) => Some(map.values), diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index c49eb45196..30ceb6866e 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -26,8 +26,8 @@ pub use etl::transform::transformer::identity_pipeline; pub use etl::transform::GreptimeTransformer; pub use etl::value::{Array, Map, Value}; pub use etl::{ - json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map, - AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, TransformedOutput, + json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map, Content, + DispatchedTo, Pipeline, PipelineExecOutput, TransformedOutput, TransformerMode, }; pub use manager::{ pipeline_operator, table, util, IdentityTimeIndex, PipelineContext, PipelineDefinition, diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index 6bc0582aea..5285053861 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -13,7 +13,7 @@ // limitations under the License. use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType}; -use pipeline::{json_to_map, parse, Content, Pipeline}; +use pipeline::{json_to_map, parse, setup_pipeline, Content, Pipeline, PipelineContext}; /// test util function to parse and execute pipeline pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { @@ -22,7 +22,12 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); - let schema = pipeline.schemas().unwrap().clone(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let mut rows = Vec::new(); @@ -31,7 +36,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { for value in array { let intermediate_status = json_to_map(value).unwrap(); let row = pipeline - .exec_mut(intermediate_status) + .exec_mut(intermediate_status, &pipeline_ctx, &mut schema_info) .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); @@ -41,7 +46,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { serde_json::Value::Object(_) => { let intermediate_status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(intermediate_status) + .exec_mut(intermediate_status, &pipeline_ctx, &mut schema_info) .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); @@ -52,7 +57,10 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { } } - Rows { schema, rows } + Rows { + schema: schema_info.schema.clone(), + rows, + } } /// test util function to create column schema diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs index 85ec7a6310..7eb3df749f 100644 --- a/src/pipeline/tests/dissect.rs +++ b/src/pipeline/tests/dissect.rs @@ -16,7 +16,7 @@ mod common; use greptime_proto::v1::value::ValueData::StringValue; use greptime_proto::v1::{ColumnDataType, SemanticType}; -use pipeline::json_to_map; +use pipeline::{json_to_map, setup_pipeline, PipelineContext}; fn make_string_column_schema(name: String) -> greptime_proto::v1::ColumnSchema { common::make_column_schema(name, ColumnDataType::String, SemanticType::Field) @@ -274,9 +274,17 @@ transform: let yaml_content = pipeline::Content::Yaml(pipeline_yaml); let pipeline: pipeline::Pipeline = pipeline::parse(&yaml_content).expect("failed to parse pipeline"); + + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + let result = json_to_map(input_value).unwrap(); - let row = pipeline.exec_mut(result); + let row = pipeline.exec_mut(result, &pipeline_ctx, &mut schema_info); assert!(row.is_err()); assert_eq!(row.err().unwrap().to_string(), "No matching pattern found"); diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index cd2f771208..16146cbc2a 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -20,7 +20,7 @@ use greptime_proto::v1::value::ValueData::{ U32Value, U64Value, U8Value, }; use greptime_proto::v1::Value as GreptimeValue; -use pipeline::{json_to_map, parse, Content, Pipeline}; +use pipeline::{json_to_map, parse, setup_pipeline, Content, Pipeline, PipelineContext}; #[test] fn test_complex_data() { @@ -419,10 +419,16 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let stats = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(stats) + .exec_mut(stats, &pipeline_ctx, &mut schema_info) .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); @@ -487,10 +493,16 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(status) + .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -596,10 +608,16 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(status) + .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -662,10 +680,16 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(status) + .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -702,10 +726,16 @@ transform: "#; let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(status) + .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -761,10 +791,16 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(status) + .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -802,10 +838,16 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let status = json_to_map(input_value).unwrap(); let row = pipeline - .exec_mut(status) + .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -864,10 +906,16 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let status = json_to_map(input_value1).unwrap(); let dispatched_to = pipeline - .exec_mut(status) + .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_dispatched() .expect("expect dispatched result "); @@ -876,7 +924,7 @@ transform: let status = json_to_map(input_value2).unwrap(); let row = pipeline - .exec_mut(status) + .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); @@ -928,9 +976,17 @@ table_suffix: _${logger} let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); let status = json_to_map(input_value).unwrap(); - let exec_re = pipeline.exec_mut(status).unwrap(); + let exec_re = pipeline + .exec_mut(status, &pipeline_ctx, &mut schema_info) + .unwrap(); let (row, table_name) = exec_re.into_transformed().unwrap(); let values = row.values; diff --git a/src/pipeline/tests/timestamp.rs b/src/pipeline/tests/timestamp.rs deleted file mode 100644 index 8f7d3d0bc3..0000000000 --- a/src/pipeline/tests/timestamp.rs +++ /dev/null @@ -1,422 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod common; - -use api::v1::ColumnSchema; -use greptime_proto::v1::value::ValueData; -use greptime_proto::v1::{ColumnDataType, SemanticType}; -use lazy_static::lazy_static; - -const TEST_INPUT: &str = r#" -{ - "input_str": "2024-06-27T06:13:36.991Z" -}"#; - -const TEST_VALUE: Option = - Some(ValueData::TimestampNanosecondValue(1719468816991000000)); - -lazy_static! { - static ref EXPECTED_SCHEMA: Vec = vec![ - common::make_column_schema( - "ts".to_string(), - ColumnDataType::TimestampNanosecond, - SemanticType::Field, - ), - common::make_column_schema( - "greptime_timestamp".to_string(), - ColumnDataType::TimestampNanosecond, - SemanticType::Timestamp, - ), - ]; -} - -#[test] -fn test_timestamp_parse_date() { - let pipeline_yaml = r#" -processors: - - timestamp: - fields: - - input_str - formats: - - "%Y-%m-%dT%H:%M:%S%.3fZ" - -transform: - - fields: - - input_str, ts - type: time -"#; - - let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); - assert_eq!(output.schema, *EXPECTED_SCHEMA); - assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE); -} - -#[test] -fn test_timestamp_multi_formats() { - let pipeline_yaml = r#" -processors: - - timestamp: - fields: - - input_str - formats: - - "%Y-%m-%dT%H:%M:%S" - - "%Y-%m-%dT%H:%M:%S%.3fZ" - -transform: - - fields: - - input_str, ts - type: time -"#; - - let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); - assert_eq!(output.schema, *EXPECTED_SCHEMA); - assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE); -} - -#[test] -fn test_timestamp_ignore_missing() { - { - let empty_input = r#"{}"#; - - let pipeline_yaml = r#" -processors: - - timestamp: - fields: - - input_str - formats: - - "%Y-%m-%dT%H:%M:%S" - - "%Y-%m-%dT%H:%M:%S%.3fZ" - ignore_missing: true - -transform: - - fields: - - input_str, ts - type: time -"#; - - let output = common::parse_and_exec(empty_input, pipeline_yaml); - assert_eq!(output.schema, *EXPECTED_SCHEMA); - assert_eq!(output.rows[0].values[0].value_data, None); - } - { - let empty_input = r#"{}"#; - - let pipeline_yaml = r#" - processors: - - timestamp: - field: input_s - resolution: s - ignore_missing: true - - transform: - - fields: - - input_s, ts - type: timestamp, s - "#; - - let expected_schema = vec![ - common::make_column_schema( - "ts".to_string(), - ColumnDataType::TimestampSecond, - SemanticType::Field, - ), - common::make_column_schema( - "greptime_timestamp".to_string(), - ColumnDataType::TimestampNanosecond, - SemanticType::Timestamp, - ), - ]; - - let output = common::parse_and_exec(empty_input, pipeline_yaml); - assert_eq!(output.schema, expected_schema); - assert_eq!(output.rows[0].values[0].value_data, None); - } -} - -#[test] -fn test_timestamp_timezone() { - let pipeline_yaml = r#" -processors: - - timestamp: - fields: - - input_str - formats: - - ["%Y-%m-%dT%H:%M:%S", "Asia/Shanghai"] - - ["%Y-%m-%dT%H:%M:%S%.3fZ", "Asia/Shanghai"] - ignore_missing: true - -transform: - - fields: - - input_str, ts - type: time -"#; - - let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml); - assert_eq!(output.schema, *EXPECTED_SCHEMA); - assert_eq!( - output.rows[0].values[0].value_data, - Some(ValueData::TimestampNanosecondValue(1719440016991000000)) - ); -} - -#[test] -fn test_timestamp_parse_epoch() { - let test_input = r#" - { - "input_s": "1722580862", - "input_sec": "1722580862", - "input_second": "1722580862", - "input_ms": "1722580887794", - "input_millisecond": "1722580887794", - "input_milli": "1722580887794", - "input_default": "1722580887794", - "input_us": "1722580905423969", - "input_microsecond": "1722580905423969", - "input_micro": "1722580905423969", - "input_ns": "1722580929863842048", - "input_nanosecond": "1722580929863842048", - "input_nano": "1722580929863842048" - }"#; - - let pipeline_yaml = r#" -processors: - - timestamp: - field: input_s - resolution: s - - timestamp: - field: input_sec - resolution: sec - - timestamp: - field: input_second - resolution: second - - timestamp: - field: input_ms - resolution: ms - - timestamp: - field: input_millisecond - resolution: millisecond - - timestamp: - field: input_milli - resolution: milli - - timestamp: - field: input_default - - timestamp: - field: input_us - resolution: us - - timestamp: - field: input_microsecond - resolution: microsecond - - timestamp: - field: input_micro - resolution: micro - - timestamp: - field: input_ns - resolution: ns - - timestamp: - field: input_nanosecond - resolution: nanosecond - - timestamp: - field: input_nano - resolution: nano - -transform: - - field: input_s - type: timestamp, s - - field: input_sec - type: timestamp, sec - - field: input_second - type: timestamp, second - - - field: input_ms - type: timestamp, ms - - field: input_millisecond - type: timestamp, millisecond - - field: input_milli - type: timestamp, milli - - field: input_default - type: timestamp, milli - - - field: input_us - type: timestamp, us - - field: input_microsecond - type: timestamp, microsecond - - field: input_micro - type: timestamp, micro - - - field: input_ns - type: timestamp, ns - - field: input_nanosecond - type: timestamp, nanosecond - - field: input_nano - type: timestamp, nano -"#; - fn make_time_field(name: &str, datatype: ColumnDataType) -> ColumnSchema { - common::make_column_schema(name.to_string(), datatype, SemanticType::Field) - } - - let expected_schema = vec![ - make_time_field("input_s", ColumnDataType::TimestampSecond), - make_time_field("input_sec", ColumnDataType::TimestampSecond), - make_time_field("input_second", ColumnDataType::TimestampSecond), - make_time_field("input_ms", ColumnDataType::TimestampMillisecond), - make_time_field("input_millisecond", ColumnDataType::TimestampMillisecond), - make_time_field("input_milli", ColumnDataType::TimestampMillisecond), - make_time_field("input_default", ColumnDataType::TimestampMillisecond), - make_time_field("input_us", ColumnDataType::TimestampMicrosecond), - make_time_field("input_microsecond", ColumnDataType::TimestampMicrosecond), - make_time_field("input_micro", ColumnDataType::TimestampMicrosecond), - make_time_field("input_ns", ColumnDataType::TimestampNanosecond), - make_time_field("input_nanosecond", ColumnDataType::TimestampNanosecond), - make_time_field("input_nano", ColumnDataType::TimestampNanosecond), - common::make_column_schema( - "greptime_timestamp".to_string(), - ColumnDataType::TimestampNanosecond, - SemanticType::Timestamp, - ), - ]; - - let output = common::parse_and_exec(test_input, pipeline_yaml); - assert_eq!(output.schema, expected_schema); - - for i in 0..2 { - assert_eq!( - output.rows[0].values[i].value_data, - Some(ValueData::TimestampSecondValue(1722580862)) - ); - } - for i in 3..6 { - assert_eq!( - output.rows[0].values[i].value_data, - Some(ValueData::TimestampMillisecondValue(1722580887794)) - ); - } - for i in 7..9 { - assert_eq!( - output.rows[0].values[i].value_data, - Some(ValueData::TimestampMicrosecondValue(1722580905423969)) - ); - } - for i in 10..12 { - assert_eq!( - output.rows[0].values[i].value_data, - Some(ValueData::TimestampNanosecondValue(1722580929863842048)) - ); - } -} - -#[test] -fn test_timestamp_default_wrong_resolution() { - // same as test_default_wrong_resolution from epoch tests - let test_input = r#" - { - "input_s": "1722580862", - "input_nano": "1722583122284583936" - }"#; - - let pipeline_yaml = r#" -processors: - - timestamp: - field: input_s - resolution: s - - timestamp: - field: input_nano - resolution: ns - -transform: - - fields: - - input_s - type: timestamp, ms - - fields: - - input_nano - type: timestamp, ms -"#; - - let expected_schema = vec![ - common::make_column_schema( - "input_s".to_string(), - ColumnDataType::TimestampMillisecond, - SemanticType::Field, - ), - common::make_column_schema( - "input_nano".to_string(), - ColumnDataType::TimestampMillisecond, - SemanticType::Field, - ), - common::make_column_schema( - "greptime_timestamp".to_string(), - ColumnDataType::TimestampNanosecond, - SemanticType::Timestamp, - ), - ]; - - let output = common::parse_and_exec(test_input, pipeline_yaml); - assert_eq!(output.schema, expected_schema); - assert_eq!( - output.rows[0].values[0].value_data, - Some(ValueData::TimestampMillisecondValue(1722580862000)) - ); - assert_eq!( - output.rows[0].values[1].value_data, - Some(ValueData::TimestampMillisecondValue(1722583122284)) - ); -} - -#[test] -fn test_timestamp_without_processor() { - let test_input = r#" - { - "input_s": 1722580862, - "input_nano": 1722583122284583936 - }"#; - - let pipeline_yaml = r#" -transform: - - fields: - - input_s - type: timestamp, s - - fields: - - input_nano - type: timestamp, ns -"#; - - let expected_schema = vec![ - common::make_column_schema( - "input_s".to_string(), - ColumnDataType::TimestampSecond, - SemanticType::Field, - ), - common::make_column_schema( - "input_nano".to_string(), - ColumnDataType::TimestampNanosecond, - SemanticType::Field, - ), - common::make_column_schema( - "greptime_timestamp".to_string(), - ColumnDataType::TimestampNanosecond, - SemanticType::Timestamp, - ), - ]; - - let output = common::parse_and_exec(test_input, pipeline_yaml); - assert_eq!(output.schema, expected_schema); - assert_eq!( - output.rows[0].values[0].value_data, - Some(ValueData::TimestampSecondValue(1722580862)) - ); - assert_eq!( - output.rows[0].values[1].value_data, - Some(ValueData::TimestampNanosecondValue(1722583122284583936)) - ); -} diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 9da6ecc86f..2ddab66728 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -16,16 +16,16 @@ use std::collections::BTreeMap; use std::sync::Arc; use ahash::{HashMap, HashMapExt}; -use api::v1::{RowInsertRequest, Rows}; -use itertools::Itertools; -use pipeline::error::AutoTransformOneTimestampSnafu; +use api::greptime_proto; +use api::v1::{ColumnDataType, ColumnSchema, RowInsertRequest, Rows, SemanticType}; +use common_time::timestamp::TimeUnit; use pipeline::{ - unwrap_or_continue_if_err, AutoTransformOutput, ContextReq, DispatchedTo, IdentityTimeIndex, - Pipeline, PipelineContext, PipelineDefinition, PipelineExecOutput, TransformedOutput, Value, + unwrap_or_continue_if_err, ContextReq, DispatchedTo, Pipeline, PipelineContext, + PipelineDefinition, PipelineExecOutput, SchemaInfo, TransformedOutput, TransformerMode, Value, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; use session::context::{Channel, QueryContextRef}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use crate::error::{CatalogSnafu, PipelineSnafu, Result}; use crate::http::event::PipelineIngestRequest; @@ -118,12 +118,35 @@ async fn run_custom_pipeline( let arr_len = pipeline_maps.len(); let mut transformed_map = HashMap::new(); let mut dispatched: BTreeMap> = BTreeMap::new(); - let mut auto_map = HashMap::new(); - let mut auto_map_ts_keys = HashMap::new(); + + let mut schema_info = match pipeline.transformer() { + TransformerMode::GreptimeTransformer(greptime_transformer) => { + SchemaInfo::from_schema_list(greptime_transformer.schemas().clone()) + } + TransformerMode::AutoTransform(ts_name, timeunit) => { + let timeunit = match timeunit { + TimeUnit::Second => ColumnDataType::TimestampSecond, + TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond, + TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond, + TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond, + }; + + let mut schema_info = SchemaInfo::default(); + schema_info.schema.push(ColumnSchema { + column_name: ts_name.clone(), + datatype: timeunit.into(), + semantic_type: SemanticType::Timestamp as i32, + datatype_extension: None, + options: None, + }); + + schema_info + } + }; for pipeline_map in pipeline_maps { let result = pipeline - .exec_mut(pipeline_map) + .exec_mut(pipeline_map, pipeline_ctx, &mut schema_info) .inspect_err(|_| { METRIC_HTTP_LOGS_TRANSFORM_ELAPSED .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) @@ -137,23 +160,10 @@ async fn run_custom_pipeline( opt, row, table_suffix, - pipeline_map: _val, }) => { let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); push_to_map!(transformed_map, (opt, act_table_name), row, arr_len); } - PipelineExecOutput::AutoTransform(AutoTransformOutput { - table_suffix, - ts_unit_map, - pipeline_map, - }) => { - let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); - push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len); - auto_map_ts_keys - .entry(act_table_name) - .or_insert_with(HashMap::new) - .extend(ts_unit_map); - } PipelineExecOutput::DispatchedTo(dispatched_to, val) => { push_to_map!(dispatched, dispatched_to, val, arr_len); } @@ -162,61 +172,24 @@ async fn run_custom_pipeline( let mut results = ContextReq::default(); - if let Some(s) = pipeline.schemas() { - // transformed + let s_len = schema_info.schema.len(); - // if current pipeline generates some transformed results, build it as - // `RowInsertRequest` and append to results. If the pipeline doesn't - // have dispatch, this will be only output of the pipeline. - for ((opt, table_name), rows) in transformed_map { - results.add_row( - opt, - RowInsertRequest { - rows: Some(Rows { - rows, - schema: s.clone(), - }), - table_name, - }, - ); - } - } else { - // auto map - for (table_name, pipeline_maps) in auto_map { - if pipeline_maps.is_empty() { - continue; - } - - let ts_unit_map = auto_map_ts_keys - .remove(&table_name) - .context(AutoTransformOneTimestampSnafu) - .context(PipelineSnafu)?; - // only one timestamp key is allowed - // which will be converted to ts index - let (ts_key, unit) = ts_unit_map - .into_iter() - .exactly_one() - .map_err(|_| AutoTransformOneTimestampSnafu.build()) - .context(PipelineSnafu)?; - - let ident_ts_index = IdentityTimeIndex::Epoch(ts_key.to_string(), unit, false); - let new_def = PipelineDefinition::GreptimeIdentityPipeline(Some(ident_ts_index)); - let next_pipeline_ctx = - PipelineContext::new(&new_def, pipeline_ctx.pipeline_param, pipeline_ctx.channel); - - let reqs = run_identity_pipeline( - handler, - &next_pipeline_ctx, - PipelineIngestRequest { - table: table_name, - values: pipeline_maps, - }, - query_ctx, - ) - .await?; - - results.merge(reqs); + // if transformed + for ((opt, table_name), mut rows) in transformed_map { + for row in rows.iter_mut() { + row.values + .resize(s_len, greptime_proto::v1::Value::default()); } + results.add_row( + opt, + RowInsertRequest { + rows: Some(Rows { + rows, + schema: schema_info.schema.clone(), + }), + table_name, + }, + ); } // if current pipeline contains dispatcher and has several rules, we may diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index ad241f85a2..db1df56004 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -108,6 +108,7 @@ macro_rules! http_tests { test_pipeline_context, test_pipeline_with_vrl, test_pipeline_with_hint_vrl, + test_pipeline_2, test_pipeline_skip_error, test_otlp_metrics, @@ -2497,6 +2498,87 @@ transform: guard.remove_all().await; } +pub async fn test_pipeline_2(storage_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(storage_type, "test_pipeline_2").await; + + // handshake + let client = TestClient::new(app).await; + + let pipeline = r#" +version: 2 +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + +transform: + - field: id1 + type: int32 + index: inverted + - field: time + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/root") + .header("Content-Type", "application/x-yaml") + .body(pipeline) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let data_body = r#" +[ + { + "id1": "123", + "id2": "2436", + "time": "2024-05-25 20:16:37.217" + } +] +"#; + let res = client + .post("/v1/events/logs?db=public&table=d_table&pipeline_name=root") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // CREATE TABLE IF NOT EXISTS "d_table" ( + // "id1" INT NULL INVERTED INDEX, + // "time" TIMESTAMP(9) NOT NULL, + // "id2" STRING NULL, + // TIME INDEX ("time") + // ) + // ENGINE=mito + // WITH( + // append_mode = 'true' + // ) + validate_data( + "test_pipeline_2_schema", + &client, + "show create table d_table", + "[[\"d_table\",\"CREATE TABLE IF NOT EXISTS \\\"d_table\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n \\\"id2\\\" STRING NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]", + ) + .await; + + validate_data( + "test_pipeline_2_data", + &client, + "select * from d_table", + "[[123,1716668197217000000,\"2436\"]]", + ) + .await; + + guard.remove_all().await; +} + pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =