diff --git a/src/pipeline/benches/processor.rs b/src/pipeline/benches/processor.rs index 74b136e873..d27b087ba3 100644 --- a/src/pipeline/benches/processor.rs +++ b/src/pipeline/benches/processor.rs @@ -33,7 +33,7 @@ fn processor_mut( .exec_mut(v, pipeline_ctx, schema_info)? .into_transformed() .expect("expect transformed result "); - result.push(r.0); + result.extend(r.into_iter().map(|v| v.0)); } Ok(result) diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 73ffb711a1..651f1cd4a9 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -19,6 +19,7 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datatypes::timestamp::TimestampNanosecond; use snafu::{Location, Snafu}; +use vrl::value::Kind; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -676,8 +677,12 @@ pub enum Error { location: Location, }, - #[snafu(display("Vrl script should return `.` in the end"))] + #[snafu(display( + "Vrl script should return object or array in the end, got `{:?}`", + result_kind + ))] VrlReturnValue { + result_kind: Kind, #[snafu(implicit)] location: Location, }, @@ -695,6 +700,25 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Array element at index {index} must be an object for one-to-many transformation, got {actual_type}" + ))] + ArrayElementMustBeObject { + index: usize, + actual_type: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to transform array element at index {index}: {source}"))] + TransformArrayElement { + index: usize, + #[snafu(source)] + source: Box, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to build DataFusion logical plan"))] BuildDfLogicalPlan { #[snafu(source)] @@ -792,7 +816,10 @@ impl ErrorExt for Error { | InvalidPipelineVersion { .. } | InvalidCustomTimeIndex { .. } | TimeIndexMustBeNonNull { .. } => StatusCode::InvalidArguments, - MultiPipelineWithDiffSchema { .. } | ValueMustBeMap { .. } => StatusCode::IllegalState, + MultiPipelineWithDiffSchema { .. } + | ValueMustBeMap { .. } + | ArrayElementMustBeObject { .. } => StatusCode::IllegalState, + TransformArrayElement { source, .. } => source.status_code(), BuildDfLogicalPlan { .. } | RecordBatchLenNotMatch { .. } => StatusCode::Internal, ExecuteInternalStatement { source, .. } => source.status_code(), DataFrame { source, .. } => source.status_code(), diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 2fe2a7ba53..dd4d540376 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -19,6 +19,8 @@ pub mod processor; pub mod transform; pub mod value; +use std::collections::HashMap; + use api::v1::Row; use common_time::timestamp::TimeUnit; use itertools::Itertools; @@ -30,13 +32,17 @@ use yaml_rust::{Yaml, YamlLoader}; use crate::dispatcher::{Dispatcher, Rule}; use crate::error::{ - AutoTransformOneTimestampSnafu, Error, IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu, - Result, YamlLoadSnafu, YamlParseSnafu, + ArrayElementMustBeObjectSnafu, AutoTransformOneTimestampSnafu, Error, + IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu, Result, TransformArrayElementSnafu, + YamlLoadSnafu, YamlParseSnafu, }; use crate::etl::processor::ProcessorKind; -use crate::etl::transform::transformer::greptime::values_to_row; +use crate::etl::transform::transformer::greptime::{RowWithTableSuffix, values_to_rows}; use crate::tablesuffix::TableSuffixTemplate; -use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo}; +use crate::{ + ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo, + unwrap_or_continue_if_err, +}; const DESCRIPTION: &str = "description"; const DOC_VERSION: &str = "version"; @@ -230,21 +236,51 @@ pub enum PipelineExecOutput { Filtered, } +/// Output from a successful pipeline transformation. +/// +/// Rows are grouped by their ContextOpt, with each row having its own optional +/// table_suffix for routing to different tables when using one-to-many expansion. +/// This enables true per-row configuration options where different rows can have +/// different database settings (TTL, merge mode, etc.). #[derive(Debug)] pub struct TransformedOutput { - pub opt: ContextOpt, - pub row: Row, - pub table_suffix: Option, + /// Rows grouped by their ContextOpt, each with optional table suffix + pub rows_by_context: HashMap>, } impl PipelineExecOutput { // Note: This is a test only function, do not use it in production. - pub fn into_transformed(self) -> Option<(Row, Option)> { - if let Self::Transformed(TransformedOutput { - row, table_suffix, .. - }) = self - { - Some((row, table_suffix)) + pub fn into_transformed(self) -> Option> { + if let Self::Transformed(TransformedOutput { rows_by_context }) = self { + // For backward compatibility, merge all rows with a default ContextOpt + Some(rows_by_context.into_values().flatten().collect()) + } else { + None + } + } + + // New method for accessing the HashMap structure directly + pub fn into_transformed_hashmap(self) -> Option>> { + if let Self::Transformed(TransformedOutput { rows_by_context }) = self { + Some(rows_by_context) + } else { + None + } + } + + // Backward compatibility helper that returns first ContextOpt with all its rows + // or merges all rows with default ContextOpt for multi-context scenarios + pub fn into_legacy_format(self) -> Option<(ContextOpt, Vec)> { + if let Self::Transformed(TransformedOutput { rows_by_context }) = self { + if rows_by_context.len() == 1 { + let (opt, rows) = rows_by_context.into_iter().next().unwrap(); + Some((opt, rows)) + } else { + // Multiple contexts: merge all rows with default ContextOpt for test compatibility + let all_rows: Vec = + rows_by_context.into_values().flatten().collect(); + Some((ContextOpt::default(), all_rows)) + } } else { None } @@ -285,45 +321,43 @@ impl Pipeline { return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val)); } - // extract the options first - // this might be a breaking change, for table_suffix is now right after the processors - let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?; - let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val); + let mut val = if val.is_array() { + val + } else { + VrlValue::Array(vec![val]) + }; - let row = match self.transformer() { + let rows_by_context = match self.transformer() { TransformerMode::GreptimeTransformer(greptime_transformer) => { - let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?; - if self.is_v1() { - // v1 dont combine with auto-transform - // so return immediately - return Ok(PipelineExecOutput::Transformed(TransformedOutput { - opt, - row: Row { values }, - table_suffix, - })); - } - // continue v2 process, and set the rest fields with auto-transform - // if transformer presents, then ts has been set - values_to_row(schema_info, val, pipeline_ctx, Some(values), false)? + transform_array_elements_by_ctx( + // SAFETY: by line 326, val must be an array + val.as_array_mut().unwrap(), + greptime_transformer, + self.is_v1(), + schema_info, + pipeline_ctx, + self.tablesuffix.as_ref(), + )? } TransformerMode::AutoTransform(ts_name, time_unit) => { - // infer ts from the context - // we've check that only one timestamp should exist - - // Create pipeline context with the found timestamp let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some( IdentityTimeIndex::Epoch(ts_name.clone(), *time_unit, false), )); let n_ctx = PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel); - values_to_row(schema_info, val, &n_ctx, None, true)? + values_to_rows( + schema_info, + val, + &n_ctx, + None, + true, + self.tablesuffix.as_ref(), + )? } }; Ok(PipelineExecOutput::Transformed(TransformedOutput { - opt, - row, - table_suffix, + rows_by_context, })) } @@ -350,6 +384,65 @@ impl Pipeline { } } +/// Transforms an array of VRL values into rows grouped by their ContextOpt. +/// Each element can have its own ContextOpt for per-row configuration. +fn transform_array_elements_by_ctx( + arr: &mut [VrlValue], + transformer: &GreptimeTransformer, + is_v1: bool, + schema_info: &mut SchemaInfo, + pipeline_ctx: &PipelineContext<'_>, + tablesuffix_template: Option<&TableSuffixTemplate>, +) -> Result>> { + let skip_error = pipeline_ctx.pipeline_param.skip_error(); + let mut rows_by_context = HashMap::new(); + + for (index, element) in arr.iter_mut().enumerate() { + if !element.is_object() { + unwrap_or_continue_if_err!( + ArrayElementMustBeObjectSnafu { + index, + actual_type: element.kind_str().to_string(), + } + .fail(), + skip_error + ); + } + + let values = + unwrap_or_continue_if_err!(transformer.transform_mut(element, is_v1), skip_error); + if is_v1 { + // v1 mode: just use transformer output directly + let mut opt = unwrap_or_continue_if_err!( + ContextOpt::from_pipeline_map_to_opt(element), + skip_error + ); + let table_suffix = opt.resolve_table_suffix(tablesuffix_template, element); + rows_by_context + .entry(opt) + .or_insert_with(Vec::new) + .push((Row { values }, table_suffix)); + } else { + // v2 mode: combine with auto-transform for remaining fields + let element_rows_map = values_to_rows( + schema_info, + element.clone(), + pipeline_ctx, + Some(values), + false, + tablesuffix_template, + ) + .map_err(Box::new) + .context(TransformArrayElementSnafu { index })?; + for (k, v) in element_rows_map { + rows_by_context.entry(k).or_default().extend(v); + } + } + } + + Ok(rows_by_context) +} + pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result { intermediate_keys .iter() @@ -361,7 +454,7 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str /// The schema_info cannot be used in auto-transform ts-infer mode for lacking the ts schema. /// /// Usage: -/// ```rust +/// ```ignore /// let (pipeline, schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); /// let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_param, Channel::Unknown); /// ``` @@ -382,6 +475,7 @@ macro_rules! setup_pipeline { (pipeline, schema_info, pipeline_def, pipeline_param) }}; } + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -433,15 +527,16 @@ transform: ); let payload = input_value.into(); - let result = pipeline + let mut result = pipeline .exec_mut(payload, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .unwrap(); - assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1))); - assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2))); - match &result.0.values[2].value_data { + let (row, _table_suffix) = result.swap_remove(0); + assert_eq!(row.values[0].value_data, Some(ValueData::U32Value(1))); + assert_eq!(row.values[1].value_data, Some(ValueData::U32Value(2))); + match &row.values[2].value_data { Some(ValueData::TimestampNanosecondValue(v)) => { assert_ne!(v, &0); } @@ -504,7 +599,7 @@ transform: .into_transformed() .unwrap(); - assert_eq!(schema_info.schema.len(), result.0.values.len()); + assert_eq!(schema_info.schema.len(), result[0].0.values.len()); let test = [ ( ColumnDataType::String as i32, @@ -545,7 +640,7 @@ transform: let schema = pipeline.schemas().unwrap(); for i in 0..schema.len() { let schema = &schema[i]; - let value = &result.0.values[i]; + let value = &result[0].0.values[i]; assert_eq!(schema.datatype, test[i].0); assert_eq!(value.value_data, test[i].1); } @@ -595,9 +690,15 @@ transform: .unwrap() .into_transformed() .unwrap(); - assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1))); - assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2))); - match &result.0.values[2].value_data { + assert_eq!( + result[0].0.values[0].value_data, + Some(ValueData::U32Value(1)) + ); + assert_eq!( + result[0].0.values[1].value_data, + Some(ValueData::U32Value(2)) + ); + match &result[0].0.values[2].value_data { Some(ValueData::TimestampNanosecondValue(v)) => { assert_ne!(v, &0); } @@ -644,14 +745,14 @@ transform: let schema = pipeline.schemas().unwrap().clone(); let result = input_value.into(); - let row = pipeline + let rows_with_suffix = pipeline .exec_mut(result, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .unwrap(); let output = Rows { schema, - rows: vec![row.0], + rows: rows_with_suffix.into_iter().map(|(r, _)| r).collect(), }; let schemas = output.schema; @@ -804,4 +905,566 @@ transform: let r: Result = parse(&Content::Yaml(bad_yaml3)); assert!(r.is_err()); } + + /// Test one-to-many VRL pipeline expansion. + /// A VRL processor can return an array, which results in multiple output rows. + #[test] + fn test_one_to_many_vrl_expansion() { + let pipeline_yaml = r#" +processors: + - epoch: + field: timestamp + resolution: ms + - vrl: + source: | + events = del(.events) + base_host = del(.host) + base_ts = del(.timestamp) + map_values(array!(events)) -> |event| { + { + "host": base_host, + "event_type": event.type, + "event_value": event.value, + "timestamp": base_ts + } + } + +transform: + - field: host + type: string + - field: event_type + type: string + - field: event_value + type: int32 + - field: timestamp + type: timestamp, ms + index: time +"#; + + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + // Input with 3 events + let input_value: serde_json::Value = serde_json::from_str( + r#"{ + "host": "server1", + "timestamp": 1716668197217, + "events": [ + {"type": "cpu", "value": 80}, + {"type": "memory", "value": 60}, + {"type": "disk", "value": 45} + ] + }"#, + ) + .unwrap(); + + let payload = input_value.into(); + let result = pipeline + .exec_mut(payload, &pipeline_ctx, &mut schema_info) + .unwrap() + .into_transformed() + .unwrap(); + + // Should produce 3 rows from 1 input + assert_eq!(result.len(), 3); + + // Verify each row has correct structure + for (row, _table_suffix) in &result { + assert_eq!(row.values.len(), 4); // host, event_type, event_value, timestamp + // First value should be "server1" + assert_eq!( + row.values[0].value_data, + Some(ValueData::StringValue("server1".to_string())) + ); + // Last value should be the timestamp + assert_eq!( + row.values[3].value_data, + Some(ValueData::TimestampMillisecondValue(1716668197217)) + ); + } + + // Verify event types + let event_types: Vec<_> = result + .iter() + .map(|(r, _)| match &r.values[1].value_data { + Some(ValueData::StringValue(s)) => s.clone(), + _ => panic!("expected string"), + }) + .collect(); + assert!(event_types.contains(&"cpu".to_string())); + assert!(event_types.contains(&"memory".to_string())); + assert!(event_types.contains(&"disk".to_string())); + } + + /// Test that single object output still works (backward compatibility) + #[test] + fn test_single_object_output_unchanged() { + let pipeline_yaml = r#" +processors: + - epoch: + field: ts + resolution: ms + - vrl: + source: | + .processed = true + . + +transform: + - field: name + type: string + - field: processed + type: boolean + - field: ts + type: timestamp, ms + index: time +"#; + + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + let input_value: serde_json::Value = serde_json::from_str( + r#"{ + "name": "test", + "ts": 1716668197217 + }"#, + ) + .unwrap(); + + let payload = input_value.into(); + let result = pipeline + .exec_mut(payload, &pipeline_ctx, &mut schema_info) + .unwrap() + .into_transformed() + .unwrap(); + + // Should produce exactly 1 row + assert_eq!(result.len(), 1); + assert_eq!( + result[0].0.values[0].value_data, + Some(ValueData::StringValue("test".to_string())) + ); + assert_eq!( + result[0].0.values[1].value_data, + Some(ValueData::BoolValue(true)) + ); + } + + /// Test that empty array produces zero rows + #[test] + fn test_empty_array_produces_zero_rows() { + let pipeline_yaml = r#" +processors: + - vrl: + source: | + .events + +transform: + - field: value + type: int32 + - field: greptime_timestamp + type: timestamp, ns + index: time +"#; + + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap(); + + let payload = input_value.into(); + let result = pipeline + .exec_mut(payload, &pipeline_ctx, &mut schema_info) + .unwrap() + .into_transformed() + .unwrap(); + + // Empty array should produce zero rows + assert_eq!(result.len(), 0); + } + + /// Test that array elements must be objects + #[test] + fn test_array_element_must_be_object() { + let pipeline_yaml = r#" +processors: + - vrl: + source: | + .items + +transform: + - field: value + type: int32 + - field: greptime_timestamp + type: timestamp, ns + index: time +"#; + + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + // Array with non-object elements should fail + let input_value: serde_json::Value = + serde_json::from_str(r#"{"items": [1, 2, 3]}"#).unwrap(); + + let payload = input_value.into(); + let result = pipeline.exec_mut(payload, &pipeline_ctx, &mut schema_info); + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("must be an object"), + "Expected error about non-object element, got: {}", + err_msg + ); + } + + /// Test one-to-many with table suffix from VRL hint + #[test] + fn test_one_to_many_with_table_suffix_hint() { + let pipeline_yaml = r#" +processors: + - epoch: + field: ts + resolution: ms + - vrl: + source: | + .greptime_table_suffix = "_" + string!(.category) + . + +transform: + - field: name + type: string + - field: category + type: string + - field: ts + type: timestamp, ms + index: time +"#; + + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + let input_value: serde_json::Value = serde_json::from_str( + r#"{ + "name": "test", + "category": "metrics", + "ts": 1716668197217 + }"#, + ) + .unwrap(); + + let payload = input_value.into(); + let result = pipeline + .exec_mut(payload, &pipeline_ctx, &mut schema_info) + .unwrap() + .into_transformed() + .unwrap(); + + // Should have table suffix extracted per row + assert_eq!(result.len(), 1); + assert_eq!(result[0].1, Some("_metrics".to_string())); + } + + /// Test one-to-many with per-row table suffix + #[test] + fn test_one_to_many_per_row_table_suffix() { + let pipeline_yaml = r#" +processors: + - epoch: + field: timestamp + resolution: ms + - vrl: + source: | + events = del(.events) + base_ts = del(.timestamp) + + map_values(array!(events)) -> |event| { + suffix = "_" + string!(event.category) + { + "name": event.name, + "value": event.value, + "timestamp": base_ts, + "greptime_table_suffix": suffix + } + } + +transform: + - field: name + type: string + - field: value + type: int32 + - field: timestamp + type: timestamp, ms + index: time +"#; + + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + // Input with events that should go to different tables + let input_value: serde_json::Value = serde_json::from_str( + r#"{ + "timestamp": 1716668197217, + "events": [ + {"name": "cpu_usage", "value": 80, "category": "cpu"}, + {"name": "mem_usage", "value": 60, "category": "memory"}, + {"name": "cpu_temp", "value": 45, "category": "cpu"} + ] + }"#, + ) + .unwrap(); + + let payload = input_value.into(); + let result = pipeline + .exec_mut(payload, &pipeline_ctx, &mut schema_info) + .unwrap() + .into_transformed() + .unwrap(); + + // Should produce 3 rows + assert_eq!(result.len(), 3); + + // Collect table suffixes + let table_suffixes: Vec<_> = result.iter().map(|(_, suffix)| suffix.clone()).collect(); + + // Should have different table suffixes per row + assert!(table_suffixes.contains(&Some("_cpu".to_string()))); + assert!(table_suffixes.contains(&Some("_memory".to_string()))); + + // Count rows per table suffix + let cpu_count = table_suffixes + .iter() + .filter(|s| *s == &Some("_cpu".to_string())) + .count(); + let memory_count = table_suffixes + .iter() + .filter(|s| *s == &Some("_memory".to_string())) + .count(); + assert_eq!(cpu_count, 2); + assert_eq!(memory_count, 1); + } + + /// Test that one-to-many mapping preserves per-row ContextOpt in HashMap + #[test] + fn test_one_to_many_hashmap_contextopt_preservation() { + let pipeline_yaml = r#" +processors: + - epoch: + field: timestamp + resolution: ms + - vrl: + source: | + events = del(.events) + base_ts = del(.timestamp) + + map_values(array!(events)) -> |event| { + # Set different TTL values per event type + ttl = if event.type == "critical" { + "1h" + } else if event.type == "warning" { + "24h" + } else { + "7d" + } + + { + "host": del(.host), + "event_type": event.type, + "event_value": event.value, + "timestamp": base_ts, + "greptime_ttl": ttl + } + } + +transform: + - field: host + type: string + - field: event_type + type: string + - field: event_value + type: int32 + - field: timestamp + type: timestamp, ms + index: time +"#; + + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + // Input with events that should have different ContextOpt values + let input_value: serde_json::Value = serde_json::from_str( + r#"{ + "host": "server1", + "timestamp": 1716668197217, + "events": [ + {"type": "critical", "value": 100}, + {"type": "warning", "value": 50}, + {"type": "info", "value": 25} + ] + }"#, + ) + .unwrap(); + + let payload = input_value.into(); + let result = pipeline + .exec_mut(payload, &pipeline_ctx, &mut schema_info) + .unwrap(); + + // Extract the HashMap structure + let rows_by_context = result.into_transformed_hashmap().unwrap(); + + // Should have 3 different ContextOpt groups due to different TTL values + assert_eq!(rows_by_context.len(), 3); + + // Verify each ContextOpt group has exactly 1 row and different configurations + let mut context_opts = Vec::new(); + for (opt, rows) in &rows_by_context { + assert_eq!(rows.len(), 1); // Each group should have exactly 1 row + context_opts.push(opt.clone()); + } + + // ContextOpts should be different due to different TTL values + assert_ne!(context_opts[0], context_opts[1]); + assert_ne!(context_opts[1], context_opts[2]); + assert_ne!(context_opts[0], context_opts[2]); + + // Verify the rows are correctly structured + for rows in rows_by_context.values() { + for (row, _table_suffix) in rows { + assert_eq!(row.values.len(), 4); // host, event_type, event_value, timestamp + } + } + } + + /// Test that single object input still works with HashMap structure + #[test] + fn test_single_object_hashmap_compatibility() { + let pipeline_yaml = r#" +processors: + - epoch: + field: ts + resolution: ms + - vrl: + source: | + .processed = true + . + +transform: + - field: name + type: string + - field: processed + type: boolean + - field: ts + type: timestamp, ms + index: time +"#; + + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + let input_value: serde_json::Value = serde_json::from_str( + r#"{ + "name": "test", + "ts": 1716668197217 + }"#, + ) + .unwrap(); + + let payload = input_value.into(); + let result = pipeline + .exec_mut(payload, &pipeline_ctx, &mut schema_info) + .unwrap(); + + // Extract the HashMap structure + let rows_by_context = result.into_transformed_hashmap().unwrap(); + + // Single object should produce exactly 1 ContextOpt group + assert_eq!(rows_by_context.len(), 1); + + let (_opt, rows) = rows_by_context.into_iter().next().unwrap(); + assert_eq!(rows.len(), 1); + + // Verify the row structure + let (row, _table_suffix) = &rows[0]; + assert_eq!(row.values.len(), 3); // name, processed, timestamp + } + + /// Test that empty arrays work correctly with HashMap structure + #[test] + fn test_empty_array_hashmap() { + let pipeline_yaml = r#" +processors: + - vrl: + source: | + .events + +transform: + - field: value + type: int32 + - field: greptime_timestamp + type: timestamp, ns + index: time +"#; + + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap(); + + let payload = input_value.into(); + let result = pipeline + .exec_mut(payload, &pipeline_ctx, &mut schema_info) + .unwrap(); + + // Extract the HashMap structure + let rows_by_context = result.into_transformed_hashmap().unwrap(); + + // Empty array should produce empty HashMap + assert_eq!(rows_by_context.len(), 0); + } } diff --git a/src/pipeline/src/etl/ctx_req.rs b/src/pipeline/src/etl/ctx_req.rs index f8fc7c11f2..23873cfdf1 100644 --- a/src/pipeline/src/etl/ctx_req.rs +++ b/src/pipeline/src/etl/ctx_req.rs @@ -57,7 +57,7 @@ const PIPELINE_HINT_PREFIX: &str = "greptime_"; /// /// The options are set in the format of hint keys. See [`PIPELINE_HINT_KEYS`]. /// It's is used as the key in [`ContextReq`] for grouping the row insert requests. -#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] pub struct ContextOpt { // table options, that need to be set in the query context before making row insert requests auto_create_table: Option, @@ -192,8 +192,15 @@ impl ContextReq { Self { req: req_map } } - pub fn add_row(&mut self, opt: ContextOpt, req: RowInsertRequest) { - self.req.entry(opt).or_default().push(req); + pub fn add_row(&mut self, opt: &ContextOpt, req: RowInsertRequest) { + match self.req.get_mut(opt) { + None => { + self.req.insert(opt.clone(), vec![req]); + } + Some(e) => { + e.push(req); + } + } } pub fn add_rows(&mut self, opt: ContextOpt, reqs: impl IntoIterator) { diff --git a/src/pipeline/src/etl/processor/vrl_processor.rs b/src/pipeline/src/etl/processor/vrl_processor.rs index e84f0b3e4c..20258a0427 100644 --- a/src/pipeline/src/etl/processor/vrl_processor.rs +++ b/src/pipeline/src/etl/processor/vrl_processor.rs @@ -15,7 +15,7 @@ use std::collections::BTreeMap; use chrono_tz::Tz; -use snafu::OptionExt; +use snafu::{OptionExt, ensure}; use vrl::compiler::runtime::Runtime; use vrl::compiler::{Program, TargetValue, compile}; use vrl::diagnostic::Formatter; @@ -53,9 +53,15 @@ impl VrlProcessor { // check if the return value is have regex let result_def = program.final_type_info().result; let kind = result_def.kind(); - if !kind.is_object() { - return VrlReturnValueSnafu.fail(); - } + // Check if the return type could possibly be an object or array. + // We use contains_* methods since VRL type inference may return + // a Kind that represents multiple possible types. + ensure!( + kind.contains_object() || kind.contains_array(), + VrlReturnValueSnafu { + result_kind: kind.clone(), + } + ); check_regex_output(kind)?; Ok(Self { source, program }) @@ -111,13 +117,7 @@ impl crate::etl::processor::Processor for VrlProcessor { } fn exec_mut(&self, val: VrlValue) -> Result { - let val = self.resolve(val)?; - - if let VrlValue::Object(_) = val { - Ok(val) - } else { - VrlRegexValueSnafu.fail() - } + self.resolve(val) } } diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 6774842ef1..85494b24dc 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -37,8 +37,8 @@ use vrl::prelude::{Bytes, VrlValueConvert}; use vrl::value::{KeyString, Value as VrlValue}; use crate::error::{ - IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result, - TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu, + ArrayElementMustBeObjectSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, + Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu, }; use crate::etl::PipelineDocVersion; @@ -50,6 +50,9 @@ use crate::{PipelineContext, truthy, unwrap_or_continue_if_err}; const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10; +/// Row with potentially designated table suffix. +pub type RowWithTableSuffix = (Row, Option); + /// fields not in the columns will be discarded /// to prevent automatic column creation in GreptimeDB #[derive(Debug, Clone)] @@ -363,6 +366,73 @@ fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result, + row: Option>, + need_calc_ts: bool, + tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>, +) -> Result>> { + let skip_error = pipeline_ctx.pipeline_param.skip_error(); + let VrlValue::Array(arr) = values else { + // Single object: extract ContextOpt and table_suffix + let mut result = std::collections::HashMap::new(); + + let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) { + Ok(r) => r, + Err(e) => return if skip_error { Ok(result) } else { Err(e) }, + }; + + let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values); + let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) { + Ok(r) => r, + Err(e) => return if skip_error { Ok(result) } else { Err(e) }, + }; + result.insert(opt, vec![(row, table_suffix)]); + return Ok(result); + }; + + let mut rows_by_context: std::collections::HashMap> = + std::collections::HashMap::new(); + for (index, mut value) in arr.into_iter().enumerate() { + if !value.is_object() { + unwrap_or_continue_if_err!( + ArrayElementMustBeObjectSnafu { + index, + actual_type: value.kind_str().to_string(), + } + .fail(), + skip_error + ); + } + + // Extract ContextOpt and table_suffix for this element + let mut opt = unwrap_or_continue_if_err!( + ContextOpt::from_pipeline_map_to_opt(&mut value), + skip_error + ); + let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value); + let transformed_row = unwrap_or_continue_if_err!( + values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts), + skip_error + ); + rows_by_context + .entry(opt) + .or_default() + .push((transformed_row, table_suffix)); + } + Ok(rows_by_context) +} + /// `need_calc_ts` happens in two cases: /// 1. full greptime_identity /// 2. auto-transform without transformer @@ -992,4 +1062,139 @@ mod tests { assert_eq!(flattened_object, expected); } } + + use ahash::HashMap as AHashMap; + #[test] + fn test_values_to_rows_skip_error_handling() { + let table_suffix_template: Option = None; + + // Case 1: skip_error=true, mixed valid/invalid elements + { + let schema_info = &mut SchemaInfo::default(); + let input_array = vec![ + // Valid object + serde_json::json!({"name": "Alice", "age": 25}).into(), + // Invalid element (string) + VrlValue::Bytes("invalid_string".into()), + // Valid object + serde_json::json!({"name": "Bob", "age": 30}).into(), + // Invalid element (number) + VrlValue::Integer(42), + // Valid object + serde_json::json!({"name": "Charlie", "age": 35}).into(), + ]; + + let params = GreptimePipelineParams::from_map(AHashMap::from_iter([( + "skip_error".to_string(), + "true".to_string(), + )])); + + let pipeline_ctx = PipelineContext::new( + &PipelineDefinition::GreptimeIdentityPipeline(None), + ¶ms, + Channel::Unknown, + ); + + let result = values_to_rows( + schema_info, + VrlValue::Array(input_array), + &pipeline_ctx, + None, + true, + table_suffix_template.as_ref(), + ); + + // Should succeed and only process valid objects + assert!(result.is_ok()); + let rows_by_context = result.unwrap(); + // Count total rows across all ContextOpt groups + let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum(); + assert_eq!(total_rows, 3); // Only 3 valid objects + } + + // Case 2: skip_error=false, invalid elements present + { + let schema_info = &mut SchemaInfo::default(); + let input_array = vec![ + serde_json::json!({"name": "Alice", "age": 25}).into(), + VrlValue::Bytes("invalid_string".into()), // This should cause error + ]; + + let params = GreptimePipelineParams::default(); // skip_error = false + + let pipeline_ctx = PipelineContext::new( + &PipelineDefinition::GreptimeIdentityPipeline(None), + ¶ms, + Channel::Unknown, + ); + + let result = values_to_rows( + schema_info, + VrlValue::Array(input_array), + &pipeline_ctx, + None, + true, + table_suffix_template.as_ref(), + ); + + // Should fail with ArrayElementMustBeObject error + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string")); + } + } + + /// Test that values_to_rows correctly groups rows by per-element ContextOpt + #[test] + fn test_values_to_rows_per_element_context_opt() { + let table_suffix_template: Option = None; + let schema_info = &mut SchemaInfo::default(); + + // Create array with elements having different TTL values (ContextOpt) + let input_array = vec![ + serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(), + serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(), + serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(), + ]; + + let params = GreptimePipelineParams::default(); + let pipeline_ctx = PipelineContext::new( + &PipelineDefinition::GreptimeIdentityPipeline(None), + ¶ms, + Channel::Unknown, + ); + + let result = values_to_rows( + schema_info, + VrlValue::Array(input_array), + &pipeline_ctx, + None, + true, + table_suffix_template.as_ref(), + ); + + assert!(result.is_ok()); + let rows_by_context = result.unwrap(); + + // Should have 2 different ContextOpt groups (1h TTL and 24h TTL) + assert_eq!(rows_by_context.len(), 2); + + // Count rows per group + let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum(); + assert_eq!(total_rows, 3); + + // Verify that rows are correctly grouped by TTL + let mut ttl_1h_count = 0; + let mut ttl_24h_count = 0; + for rows in rows_by_context.values() { + // ContextOpt doesn't expose ttl directly, but we can count by group size + if rows.len() == 2 { + ttl_1h_count = rows.len(); + } else if rows.len() == 1 { + ttl_24h_count = rows.len(); + } + } + assert_eq!(ttl_1h_count, 2); // Alice and Bob with 1h TTL + assert_eq!(ttl_24h_count, 1); // Charlie with 24h TTL + } } diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index 09ea340235..b102bede02 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -35,21 +35,25 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { match input_value { VrlValue::Array(array) => { for value in array { - let row = pipeline + let rows_with_suffix = pipeline .exec_mut(value, &pipeline_ctx, &mut schema_info) .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); - rows.push(row.0); + for (r, _) in rows_with_suffix { + rows.push(r); + } } } VrlValue::Object(_) => { - let row = pipeline + let rows_with_suffix = pipeline .exec_mut(input_value, &pipeline_ctx, &mut schema_info) .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); - rows.push(row.0); + for (r, _) in rows_with_suffix { + rows.push(r); + } } _ => { panic!("invalid input value"); diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index ca94dbe3f0..300b7431b6 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -427,7 +427,7 @@ transform: ); let stats = input_value.into(); - let row = pipeline + let rows_with_suffix = pipeline .exec_mut(stats, &pipeline_ctx, &mut schema_info) .expect("failed to exec pipeline") .into_transformed() @@ -435,7 +435,7 @@ transform: let output = Rows { schema: pipeline.schemas().unwrap().clone(), - rows: vec![row.0], + rows: rows_with_suffix.into_iter().map(|(r, _)| r).collect(), }; assert_eq!(output.rows.len(), 1); @@ -501,13 +501,13 @@ transform: ); let status = input_value.into(); - let row = pipeline + let mut rows_with_suffix = pipeline .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); + let (row, _) = rows_with_suffix.swap_remove(0); let r = row - .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -616,15 +616,16 @@ transform: ); let status = input_value.into(); - let row = pipeline + let mut rows_with_suffix = pipeline .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); + let (row, _) = rows_with_suffix.swap_remove(0); let r = row - .0 .values + .clone() .into_iter() .map(|v| v.value_data.unwrap()) .collect::>(); @@ -688,13 +689,13 @@ transform: ); let status = input_value.into(); - let row = pipeline + let mut rows_with_suffix = pipeline .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); + let (row, _) = rows_with_suffix.swap_remove(0); let r = row - .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -734,14 +735,14 @@ transform: ); let status = input_value.into(); - let row = pipeline + let mut rows_with_suffix = pipeline .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); + let (row, _) = rows_with_suffix.swap_remove(0); let r = row - .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -799,14 +800,14 @@ transform: ); let status = input_value.into(); - let row = pipeline + let mut rows_with_suffix = pipeline .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); + let (row, _) = rows_with_suffix.swap_remove(0); let mut r = row - .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -846,13 +847,14 @@ transform: ); let status = input_value.into(); - let row = pipeline + let mut rows_with_suffix = pipeline .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); - row.0.values.into_iter().for_each(|v| { + let (row, _) = rows_with_suffix.swap_remove(0); + row.values.into_iter().for_each(|v| { if let ValueData::TimestampNanosecondValue(v) = v.value_data.unwrap() { let now = chrono::Utc::now().timestamp_nanos_opt().unwrap(); assert!(now - v < 5_000_000); @@ -923,13 +925,13 @@ transform: assert_eq!(dispatched_to.pipeline.unwrap(), "access_log_pipeline"); let status = input_value2.into(); - let row = pipeline + let mut rows_with_suffix = pipeline .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap() .into_transformed() .expect("expect transformed result "); + let (row, _) = rows_with_suffix.swap_remove(0); let r = row - .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -988,8 +990,8 @@ table_suffix: _${logger} .exec_mut(status, &pipeline_ctx, &mut schema_info) .unwrap(); - let (row, table_name) = exec_re.into_transformed().unwrap(); - let values = row.values; + let mut rows_with_suffix = exec_re.into_transformed().unwrap(); + let (row, table_suffix) = rows_with_suffix.swap_remove(0); let expected_values = vec![ Value { value_data: Some(ValueData::StringValue("hello world".into())), @@ -998,6 +1000,234 @@ table_suffix: _${logger} value_data: Some(ValueData::TimestampNanosecondValue(1716668197217000000)), }, ]; - assert_eq!(expected_values, values); - assert_eq!(table_name, Some("_http".to_string())); + assert_eq!(expected_values, row.values); + assert_eq!(table_suffix, Some("_http".to_string())); +} + +/// Test one-to-many pipeline expansion using VRL processor that returns an array +#[test] +fn test_one_to_many_pipeline() { + // Input: single log entry with a list of events + let input_value = serde_json::json!({ + "request_id": "req-123", + "events": [ + {"type": "click", "value": 100}, + {"type": "scroll", "value": 200}, + {"type": "submit", "value": 300} + ] + }); + + // VRL processor that expands events into separate rows using map + let pipeline_yaml = r#" +processors: + - vrl: + source: | + events = del(.events) + request_id = del(.request_id) + map_values(array!(events)) -> |event| { + { + "request_id": request_id, + "event_type": event.type, + "event_value": event.value + } + } + +transform: + - field: request_id + type: string + - field: event_type + type: string + - field: event_value + type: uint64 +"#; + + let yaml_content = Content::Yaml(pipeline_yaml); + let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + let status = input_value.into(); + let rows_with_suffix = pipeline + .exec_mut(status, &pipeline_ctx, &mut schema_info) + .expect("failed to exec pipeline") + .into_transformed() + .expect("expect transformed result"); + + // Should produce 3 rows from the single input + assert_eq!(rows_with_suffix.len(), 3); + + // Row 0: click event + assert_eq!( + rows_with_suffix[0].0.values[0].value_data, + Some(StringValue("req-123".into())) + ); + assert_eq!( + rows_with_suffix[0].0.values[1].value_data, + Some(StringValue("click".into())) + ); + assert_eq!( + rows_with_suffix[0].0.values[2].value_data, + Some(U64Value(100)) + ); + + // Row 1: scroll event + assert_eq!( + rows_with_suffix[1].0.values[0].value_data, + Some(StringValue("req-123".into())) + ); + assert_eq!( + rows_with_suffix[1].0.values[1].value_data, + Some(StringValue("scroll".into())) + ); + assert_eq!( + rows_with_suffix[1].0.values[2].value_data, + Some(U64Value(200)) + ); + + // Row 2: submit event + assert_eq!( + rows_with_suffix[2].0.values[0].value_data, + Some(StringValue("req-123".into())) + ); + assert_eq!( + rows_with_suffix[2].0.values[1].value_data, + Some(StringValue("submit".into())) + ); + assert_eq!( + rows_with_suffix[2].0.values[2].value_data, + Some(U64Value(300)) + ); +} + +/// Test that single object input still works correctly (backward compatibility) +#[test] +fn test_one_to_many_single_object_unchanged() { + let input_value = serde_json::json!({ + "name": "Alice", + "age": 30 + }); + + let pipeline_yaml = r#" +processors: + - vrl: + source: | + .processed = true + . + +transform: + - field: name + type: string + - field: age + type: uint32 + - field: processed + type: boolean +"#; + + let yaml_content = Content::Yaml(pipeline_yaml); + let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + let status = input_value.into(); + let rows_with_suffix = pipeline + .exec_mut(status, &pipeline_ctx, &mut schema_info) + .expect("failed to exec pipeline") + .into_transformed() + .expect("expect transformed result"); + + // Should produce exactly 1 row + assert_eq!(rows_with_suffix.len(), 1); + + let (row, _) = &rows_with_suffix[0]; + assert_eq!(row.values[0].value_data, Some(StringValue("Alice".into()))); + assert_eq!(row.values[1].value_data, Some(U32Value(30))); + assert_eq!(row.values[2].value_data, Some(BoolValue(true))); +} + +/// Test error handling when array contains non-object elements +#[test] +fn test_one_to_many_array_element_validation() { + let input_value = serde_json::json!({ + "items": ["string", 123, true] + }); + + // VRL that returns an array with non-object elements + let pipeline_yaml = r#" +processors: + - vrl: + source: | + .items + +transform: + - field: value + type: string +"#; + + let yaml_content = Content::Yaml(pipeline_yaml); + let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + let status = input_value.into(); + let result = pipeline.exec_mut(status, &pipeline_ctx, &mut schema_info); + + // Should fail because array elements are not objects + assert!(result.is_err()); + let err = result.unwrap_err(); + let err_msg = err.to_string(); + assert!( + err_msg.contains("must be an object"), + "Expected 'must be an object' error, got: {}", + err_msg + ); +} + +/// Test that empty array produces zero rows +#[test] +fn test_one_to_many_empty_array() { + let input_value = serde_json::json!({ + "events": [] + }); + + let pipeline_yaml = r#" +processors: + - vrl: + source: | + .events + +transform: + - field: value + type: string +"#; + + let yaml_content = Content::Yaml(pipeline_yaml); + let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); + let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline); + let pipeline_ctx = PipelineContext::new( + &pipeline_def, + &pipeline_param, + session::context::Channel::Unknown, + ); + + let status = input_value.into(); + let rows_with_suffix = pipeline + .exec_mut(status, &pipeline_ctx, &mut schema_info) + .expect("failed to exec pipeline") + .into_transformed() + .expect("expect transformed result"); + + // Empty array should produce zero rows + assert_eq!(rows_with_suffix.len(), 0); } diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 5a6710f420..f1e4138e63 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -16,9 +16,8 @@ use std::collections::BTreeMap; use std::sync::Arc; use ahash::{HashMap, HashMapExt}; -use api::greptime_proto; use api::v1::helper::time_index_column_schema; -use api::v1::{ColumnDataType, RowInsertRequest, Rows}; +use api::v1::{ColumnDataType, RowInsertRequest, Rows, Value}; use common_time::timestamp::TimeUnit; use pipeline::{ ContextReq, DispatchedTo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, Pipeline, PipelineContext, @@ -154,13 +153,18 @@ async fn run_custom_pipeline( let r = unwrap_or_continue_if_err!(result, skip_error); match r { - PipelineExecOutput::Transformed(TransformedOutput { - opt, - row, - table_suffix, - }) => { - let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); - push_to_map!(transformed_map, (opt, act_table_name), row, arr_len); + PipelineExecOutput::Transformed(TransformedOutput { rows_by_context }) => { + // Process each ContextOpt group separately + for (opt, rows_with_suffix) in rows_by_context { + // Group rows by table name within each context + for (row, table_suffix) in rows_with_suffix { + let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); + transformed_map + .entry((opt.clone(), act_table_name)) + .or_insert_with(|| Vec::with_capacity(arr_len)) + .push(row); + } + } } PipelineExecOutput::DispatchedTo(dispatched_to, val) => { push_to_map!(dispatched, dispatched_to, val, arr_len); @@ -173,22 +177,26 @@ async fn run_custom_pipeline( let mut results = ContextReq::default(); - let s_len = schema_info.schema.len(); - - // if transformed + // Process transformed outputs. Each entry in transformed_map contains + // Vec grouped by (opt, table_name). + let column_count = schema_info.schema.len(); for ((opt, table_name), mut rows) in transformed_map { - for row in rows.iter_mut() { - row.values - .resize(s_len, greptime_proto::v1::Value::default()); + // Pad rows to match final schema size (schema may have evolved during processing) + for row in &mut rows { + let diff = column_count.saturating_sub(row.values.len()); + for _ in 0..diff { + row.values.push(Value { value_data: None }); + } } + results.add_row( - opt, + &opt, RowInsertRequest { rows: Some(Rows { rows, schema: schema_info.schema.clone(), }), - table_name, + table_name: table_name.clone(), }, ); } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 2bafe469a5..67a2d85022 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -122,6 +122,7 @@ macro_rules! http_tests { test_pipeline_context, test_pipeline_with_vrl, test_pipeline_with_hint_vrl, + test_pipeline_one_to_many_vrl, test_pipeline_2, test_pipeline_skip_error, test_pipeline_filter, @@ -3285,6 +3286,151 @@ transform: guard.remove_all().await; } +/// Test one-to-many VRL pipeline expansion. +/// This test verifies that a VRL processor can return an array, which results in +/// multiple output rows from a single input row. +pub async fn test_pipeline_one_to_many_vrl(storage_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(storage_type, "test_pipeline_one_to_many_vrl").await; + + let client = TestClient::new(app).await; + + // Pipeline that expands events array into multiple rows + let pipeline = r#" +processors: + - date: + field: timestamp + formats: + - "%Y-%m-%d %H:%M:%S" + ignore_missing: true + - vrl: + source: | + # Extract events array and expand each event into a separate row + events = del(.events) + base_host = del(.host) + base_timestamp = del(.timestamp) + + # Map each event to a complete row object + map_values(array!(events)) -> |event| { + { + "host": base_host, + "event_type": event.type, + "event_value": event.value, + "timestamp": base_timestamp + } + } + +transform: + - field: host + type: string + - field: event_type + type: string + - field: event_value + type: int32 + - field: timestamp + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/one_to_many") + .header("Content-Type", "application/x-yaml") + .body(pipeline) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data - single input with multiple events + let data_body = r#" +[ + { + "host": "server1", + "timestamp": "2024-05-25 20:16:37", + "events": [ + {"type": "cpu", "value": 80}, + {"type": "memory", "value": 60}, + {"type": "disk", "value": 45} + ] + } +] +"#; + let res = client + .post("/v1/events/logs?db=public&table=metrics&pipeline_name=one_to_many") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. verify: one input row should produce three output rows + validate_data( + "test_pipeline_one_to_many_vrl_count", + &client, + "select count(*) from metrics", + "[[3]]", + ) + .await; + + // 4. verify the actual data + validate_data( + "test_pipeline_one_to_many_vrl_data", + &client, + "select host, event_type, event_value from metrics order by event_type", + "[[\"server1\",\"cpu\",80],[\"server1\",\"disk\",45],[\"server1\",\"memory\",60]]", + ) + .await; + + // 5. Test with multiple input rows, each producing multiple output rows + let data_body2 = r#" +[ + { + "host": "server2", + "timestamp": "2024-05-25 20:17:00", + "events": [ + {"type": "cpu", "value": 90}, + {"type": "memory", "value": 70} + ] + }, + { + "host": "server3", + "timestamp": "2024-05-25 20:18:00", + "events": [ + {"type": "cpu", "value": 50} + ] + } +] +"#; + let res = client + .post("/v1/events/logs?db=public&table=metrics&pipeline_name=one_to_many") + .header("Content-Type", "application/json") + .body(data_body2) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 6. verify total count: 3 (from first batch) + 2 + 1 = 6 rows + validate_data( + "test_pipeline_one_to_many_vrl_total_count", + &client, + "select count(*) from metrics", + "[[6]]", + ) + .await; + + // 7. verify rows per host + validate_data( + "test_pipeline_one_to_many_vrl_per_host", + &client, + "select host, count(*) as cnt from metrics group by host order by host", + "[[\"server1\",3],[\"server2\",2],[\"server3\",1]]", + ) + .await; + + guard.remove_all().await; +} + pub async fn test_pipeline_2(storage_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(storage_type, "test_pipeline_2").await;