feat: allow one to many VRL pipeline (#7342)

* feat/allow-one-to-many-pipeline:
 ### Enhance Pipeline Processing for One-to-Many Transformations

 - **Support One-to-Many Transformations**:
   - Updated `processor.rs`, `etl.rs`, `vrl_processor.rs`, and `greptime.rs` to handle one-to-many transformations by allowing VRL processors to return arrays, expanding each element into separate rows.
   - Introduced `transform_array_elements` and `values_to_rows` functions to facilitate this transformation.

 - **Error Handling Enhancements**:
   - Added new error types in `error.rs` to handle cases where array elements are not objects and for transformation failures.

 - **Testing Enhancements**:
   - Added tests in `pipeline.rs` to verify one-to-many transformations, single object processing, and error handling for non-object array elements.

 - **Context Management**:
   - Modified `ctx_req.rs` to clone `ContextOpt` when adding rows, ensuring correct context management during transformations.

 - **Server Pipeline Adjustments**:
   - Updated `pipeline.rs` in `servers` to handle transformed outputs with one-to-many row expansions, ensuring correct row padding and request formation.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 Add one-to-many VRL pipeline test in `http.rs`

 - Introduced `test_pipeline_one_to_many_vrl` to verify VRL processor's ability to expand a single input row into multiple output rows.
 - Updated `http_tests!` macro to include the new test.
 - Implemented test scenarios for single and multiple input rows, ensuring correct data transformation and row count validation.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Add Tests for VRL Pipeline Transformations

 - **File:** `src/pipeline/src/etl.rs`
   - Added tests for one-to-many VRL pipeline expansion to ensure multiple output rows from a single input.
   - Introduced tests to verify backward compatibility for single object output.
   - Implemented tests to confirm zero rows are produced from empty arrays.
   - Added validation tests to ensure array elements must be objects.
   - Developed tests for one-to-many transformations with table suffix hints from VRL.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Enhance Pipeline Transformation with Per-Row Table Suffixes

 - **`src/pipeline/src/etl.rs`**: Updated `TransformedOutput` to include per-row table suffixes, allowing for more flexible routing of transformed data. Modified `PipelineExecOutput` and related methods to
 handle the new structure.
 - **`src/pipeline/src/etl/transform/transformer/greptime.rs`**: Enhanced `values_to_rows` to support per-row table suffix extraction and application.
 - **`src/pipeline/tests/common.rs`** and **`src/pipeline/tests/pipeline.rs`**: Adjusted tests to validate the new per-row table suffix functionality, ensuring backward compatibility and correct behavior in
 one-to-many transformations.
 - **`src/servers/src/pipeline.rs`**: Modified `run_custom_pipeline` to process transformed outputs with per-row table suffixes, grouping rows by `(opt, table_name)` for insertion.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Update VRL Processor Type Checks

 - **File:** `vrl_processor.rs`
 - **Changes:** Updated type checking logic to use `contains_object()` and `contains_array()` methods instead of `is_object()` and `is_array()`. This change ensures
 compatibility with VRL type inference that may return multiple possible types.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 - **Enhance Error Handling**: Added new error types `ArrayElementMustBeObjectSnafu` and `TransformArrayElementSnafu` to improve error handling in `etl.rs` and `greptime.rs`.
 - **Refactor Error Usage**: Moved error usage declarations in `transform_array_elements` and `values_to_rows` functions to the top of the file for better organization in `etl.rs` and `greptime.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Update `greptime.rs` to Enhance Error Handling

 - **Error Handling**: Modified the `values_to_rows` function to handle invalid array elements based on the `skip_error` parameter. If `skip_error` is true, invalid elements are skipped; otherwise, an error is returned.
 - **Testing**: Added unit tests in `greptime.rs` to verify the behavior of `values_to_rows` with different `skip_error` settings, ensuring correct processing of valid objects and appropriate error handling for invalid elements.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Commit Summary

 - **Enhance `TransformedOutput` Structure**: Refactored `TransformedOutput` to use a `HashMap` for grouping rows by `ContextOpt`, allowing for per-row configuration options. Updated methods in `PipelineExecOutput` to support the new structure (`src/pipeline/src/etl.rs`).

 - **Add New Transformation Method**: Introduced `transform_array_elements_to_hashmap` to handle array inputs with per-row `ContextOpt` in `HashMap` format (`src/pipeline/src/etl.rs`).

 - **Update Pipeline Execution**: Modified `run_custom_pipeline` to process `TransformedOutput` using the new `HashMap` structure, ensuring rows are grouped by `ContextOpt` and table name (`src/servers/src/pipeline.rs`).

 - **Add Tests for New Structure**: Implemented tests to verify the functionality of the new `HashMap` structure in `TransformedOutput`, including scenarios for one-to-many mapping, single object input, and empty arrays (`src/pipeline/src/etl.rs`).

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Refactor `values_to_rows` to Return `HashMap` Grouped by `ContextOpt`

 - **`etl.rs`**:
   - Updated `values_to_rows` to return a `HashMap` grouped by `ContextOpt` instead of a vector.
   - Adjusted logic to handle single object and array inputs, ensuring rows are grouped by their `ContextOpt`.
   - Modified functions to extract rows from default `ContextOpt` and apply table suffixes accordingly.

 - **`greptime.rs`**:
   - Enhanced `values_to_rows` to handle errors gracefully with `skip_error` logic.
   - Added logic to group rows by `ContextOpt` for array inputs.

 - **Tests**:
   - Updated existing tests to validate the new `HashMap` return structure.
   - Added a new test to verify correct grouping of rows by per-element `ContextOpt`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Refactor and Enhance Error Handling in ETL Pipeline

 - **Refactored Functionality**:
   - Replaced `transform_array_elements` with `transform_array_elements_by_ctx` in `etl.rs` to streamline transformation logic and improve error handling.
   - Updated `values_to_rows` in `greptime.rs` to use `or_default` for cleaner code.

 - **Enhanced Error Handling**:
   - Introduced `unwrap_or_continue_if_err` macro in `etl.rs` to allow skipping errors based on pipeline context, improving robustness in data processing.

 These changes enhance the maintainability and error resilience of the ETL pipeline.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Update `Row` Handling in ETL Pipeline

 - **Refactor `Row` Type**: Introduced `RowWithTableSuffix` type alias to simplify handling of rows with optional table suffixes across the ETL pipeline.
 - **Modify Function Signatures**: Updated function signatures in `etl.rs` and `greptime.rs` to use `RowWithTableSuffix` for better clarity and consistency.
 - **Enhance Test Coverage**: Adjusted test logic in `greptime.rs` to align with the new `RowWithTableSuffix` type, ensuring correct grouping and processing of rows by TTL.

 Files affected: `etl.rs`, `greptime.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-12-10 14:38:44 +08:00
committed by GitHub
parent 2f9130a2de
commit 9f1aefe98f
10 changed files with 1401 additions and 111 deletions

View File

@@ -33,7 +33,7 @@ fn processor_mut(
.exec_mut(v, pipeline_ctx, schema_info)?
.into_transformed()
.expect("expect transformed result ");
result.push(r.0);
result.extend(r.into_iter().map(|v| v.0));
}
Ok(result)

View File

@@ -19,6 +19,7 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::timestamp::TimestampNanosecond;
use snafu::{Location, Snafu};
use vrl::value::Kind;
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -676,8 +677,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Vrl script should return `.` in the end"))]
#[snafu(display(
"Vrl script should return object or array in the end, got `{:?}`",
result_kind
))]
VrlReturnValue {
result_kind: Kind,
#[snafu(implicit)]
location: Location,
},
@@ -695,6 +700,25 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Array element at index {index} must be an object for one-to-many transformation, got {actual_type}"
))]
ArrayElementMustBeObject {
index: usize,
actual_type: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to transform array element at index {index}: {source}"))]
TransformArrayElement {
index: usize,
#[snafu(source)]
source: Box<Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build DataFusion logical plan"))]
BuildDfLogicalPlan {
#[snafu(source)]
@@ -792,7 +816,10 @@ impl ErrorExt for Error {
| InvalidPipelineVersion { .. }
| InvalidCustomTimeIndex { .. }
| TimeIndexMustBeNonNull { .. } => StatusCode::InvalidArguments,
MultiPipelineWithDiffSchema { .. } | ValueMustBeMap { .. } => StatusCode::IllegalState,
MultiPipelineWithDiffSchema { .. }
| ValueMustBeMap { .. }
| ArrayElementMustBeObject { .. } => StatusCode::IllegalState,
TransformArrayElement { source, .. } => source.status_code(),
BuildDfLogicalPlan { .. } | RecordBatchLenNotMatch { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
DataFrame { source, .. } => source.status_code(),

View File

@@ -19,6 +19,8 @@ pub mod processor;
pub mod transform;
pub mod value;
use std::collections::HashMap;
use api::v1::Row;
use common_time::timestamp::TimeUnit;
use itertools::Itertools;
@@ -30,13 +32,17 @@ use yaml_rust::{Yaml, YamlLoader};
use crate::dispatcher::{Dispatcher, Rule};
use crate::error::{
AutoTransformOneTimestampSnafu, Error, IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu,
Result, YamlLoadSnafu, YamlParseSnafu,
ArrayElementMustBeObjectSnafu, AutoTransformOneTimestampSnafu, Error,
IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu, Result, TransformArrayElementSnafu,
YamlLoadSnafu, YamlParseSnafu,
};
use crate::etl::processor::ProcessorKind;
use crate::etl::transform::transformer::greptime::values_to_row;
use crate::etl::transform::transformer::greptime::{RowWithTableSuffix, values_to_rows};
use crate::tablesuffix::TableSuffixTemplate;
use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo};
use crate::{
ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo,
unwrap_or_continue_if_err,
};
const DESCRIPTION: &str = "description";
const DOC_VERSION: &str = "version";
@@ -230,21 +236,51 @@ pub enum PipelineExecOutput {
Filtered,
}
/// Output from a successful pipeline transformation.
///
/// Rows are grouped by their ContextOpt, with each row having its own optional
/// table_suffix for routing to different tables when using one-to-many expansion.
/// This enables true per-row configuration options where different rows can have
/// different database settings (TTL, merge mode, etc.).
#[derive(Debug)]
pub struct TransformedOutput {
pub opt: ContextOpt,
pub row: Row,
pub table_suffix: Option<String>,
/// Rows grouped by their ContextOpt, each with optional table suffix
pub rows_by_context: HashMap<ContextOpt, Vec<RowWithTableSuffix>>,
}
impl PipelineExecOutput {
// Note: This is a test only function, do not use it in production.
pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
if let Self::Transformed(TransformedOutput {
row, table_suffix, ..
}) = self
{
Some((row, table_suffix))
pub fn into_transformed(self) -> Option<Vec<RowWithTableSuffix>> {
if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
// For backward compatibility, merge all rows with a default ContextOpt
Some(rows_by_context.into_values().flatten().collect())
} else {
None
}
}
// New method for accessing the HashMap structure directly
pub fn into_transformed_hashmap(self) -> Option<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
Some(rows_by_context)
} else {
None
}
}
// Backward compatibility helper that returns first ContextOpt with all its rows
// or merges all rows with default ContextOpt for multi-context scenarios
pub fn into_legacy_format(self) -> Option<(ContextOpt, Vec<RowWithTableSuffix>)> {
if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
if rows_by_context.len() == 1 {
let (opt, rows) = rows_by_context.into_iter().next().unwrap();
Some((opt, rows))
} else {
// Multiple contexts: merge all rows with default ContextOpt for test compatibility
let all_rows: Vec<RowWithTableSuffix> =
rows_by_context.into_values().flatten().collect();
Some((ContextOpt::default(), all_rows))
}
} else {
None
}
@@ -285,45 +321,43 @@ impl Pipeline {
return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
}
// extract the options first
// this might be a breaking change, for table_suffix is now right after the processors
let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?;
let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
let mut val = if val.is_array() {
val
} else {
VrlValue::Array(vec![val])
};
let row = match self.transformer() {
let rows_by_context = match self.transformer() {
TransformerMode::GreptimeTransformer(greptime_transformer) => {
let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?;
if self.is_v1() {
// v1 dont combine with auto-transform
// so return immediately
return Ok(PipelineExecOutput::Transformed(TransformedOutput {
opt,
row: Row { values },
table_suffix,
}));
}
// continue v2 process, and set the rest fields with auto-transform
// if transformer presents, then ts has been set
values_to_row(schema_info, val, pipeline_ctx, Some(values), false)?
transform_array_elements_by_ctx(
// SAFETY: by line 326, val must be an array
val.as_array_mut().unwrap(),
greptime_transformer,
self.is_v1(),
schema_info,
pipeline_ctx,
self.tablesuffix.as_ref(),
)?
}
TransformerMode::AutoTransform(ts_name, time_unit) => {
// infer ts from the context
// we've check that only one timestamp should exist
// Create pipeline context with the found timestamp
let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
IdentityTimeIndex::Epoch(ts_name.clone(), *time_unit, false),
));
let n_ctx =
PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
values_to_row(schema_info, val, &n_ctx, None, true)?
values_to_rows(
schema_info,
val,
&n_ctx,
None,
true,
self.tablesuffix.as_ref(),
)?
}
};
Ok(PipelineExecOutput::Transformed(TransformedOutput {
opt,
row,
table_suffix,
rows_by_context,
}))
}
@@ -350,6 +384,65 @@ impl Pipeline {
}
}
/// Transforms an array of VRL values into rows grouped by their ContextOpt.
/// Each element can have its own ContextOpt for per-row configuration.
fn transform_array_elements_by_ctx(
arr: &mut [VrlValue],
transformer: &GreptimeTransformer,
is_v1: bool,
schema_info: &mut SchemaInfo,
pipeline_ctx: &PipelineContext<'_>,
tablesuffix_template: Option<&TableSuffixTemplate>,
) -> Result<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
let skip_error = pipeline_ctx.pipeline_param.skip_error();
let mut rows_by_context = HashMap::new();
for (index, element) in arr.iter_mut().enumerate() {
if !element.is_object() {
unwrap_or_continue_if_err!(
ArrayElementMustBeObjectSnafu {
index,
actual_type: element.kind_str().to_string(),
}
.fail(),
skip_error
);
}
let values =
unwrap_or_continue_if_err!(transformer.transform_mut(element, is_v1), skip_error);
if is_v1 {
// v1 mode: just use transformer output directly
let mut opt = unwrap_or_continue_if_err!(
ContextOpt::from_pipeline_map_to_opt(element),
skip_error
);
let table_suffix = opt.resolve_table_suffix(tablesuffix_template, element);
rows_by_context
.entry(opt)
.or_insert_with(Vec::new)
.push((Row { values }, table_suffix));
} else {
// v2 mode: combine with auto-transform for remaining fields
let element_rows_map = values_to_rows(
schema_info,
element.clone(),
pipeline_ctx,
Some(values),
false,
tablesuffix_template,
)
.map_err(Box::new)
.context(TransformArrayElementSnafu { index })?;
for (k, v) in element_rows_map {
rows_by_context.entry(k).or_default().extend(v);
}
}
}
Ok(rows_by_context)
}
pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
intermediate_keys
.iter()
@@ -361,7 +454,7 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
/// The schema_info cannot be used in auto-transform ts-infer mode for lacking the ts schema.
///
/// Usage:
/// ```rust
/// ```ignore
/// let (pipeline, schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
/// let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_param, Channel::Unknown);
/// ```
@@ -382,6 +475,7 @@ macro_rules! setup_pipeline {
(pipeline, schema_info, pipeline_def, pipeline_param)
}};
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
@@ -433,15 +527,16 @@ transform:
);
let payload = input_value.into();
let result = pipeline
let mut result = pipeline
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.0.values[2].value_data {
let (row, _table_suffix) = result.swap_remove(0);
assert_eq!(row.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(row.values[1].value_data, Some(ValueData::U32Value(2)));
match &row.values[2].value_data {
Some(ValueData::TimestampNanosecondValue(v)) => {
assert_ne!(v, &0);
}
@@ -504,7 +599,7 @@ transform:
.into_transformed()
.unwrap();
assert_eq!(schema_info.schema.len(), result.0.values.len());
assert_eq!(schema_info.schema.len(), result[0].0.values.len());
let test = [
(
ColumnDataType::String as i32,
@@ -545,7 +640,7 @@ transform:
let schema = pipeline.schemas().unwrap();
for i in 0..schema.len() {
let schema = &schema[i];
let value = &result.0.values[i];
let value = &result[0].0.values[i];
assert_eq!(schema.datatype, test[i].0);
assert_eq!(value.value_data, test[i].1);
}
@@ -595,9 +690,15 @@ transform:
.unwrap()
.into_transformed()
.unwrap();
assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.0.values[2].value_data {
assert_eq!(
result[0].0.values[0].value_data,
Some(ValueData::U32Value(1))
);
assert_eq!(
result[0].0.values[1].value_data,
Some(ValueData::U32Value(2))
);
match &result[0].0.values[2].value_data {
Some(ValueData::TimestampNanosecondValue(v)) => {
assert_ne!(v, &0);
}
@@ -644,14 +745,14 @@ transform:
let schema = pipeline.schemas().unwrap().clone();
let result = input_value.into();
let row = pipeline
let rows_with_suffix = pipeline
.exec_mut(result, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
let output = Rows {
schema,
rows: vec![row.0],
rows: rows_with_suffix.into_iter().map(|(r, _)| r).collect(),
};
let schemas = output.schema;
@@ -804,4 +905,566 @@ transform:
let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
assert!(r.is_err());
}
/// Test one-to-many VRL pipeline expansion.
/// A VRL processor can return an array, which results in multiple output rows.
#[test]
fn test_one_to_many_vrl_expansion() {
let pipeline_yaml = r#"
processors:
- epoch:
field: timestamp
resolution: ms
- vrl:
source: |
events = del(.events)
base_host = del(.host)
base_ts = del(.timestamp)
map_values(array!(events)) -> |event| {
{
"host": base_host,
"event_type": event.type,
"event_value": event.value,
"timestamp": base_ts
}
}
transform:
- field: host
type: string
- field: event_type
type: string
- field: event_value
type: int32
- field: timestamp
type: timestamp, ms
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
// Input with 3 events
let input_value: serde_json::Value = serde_json::from_str(
r#"{
"host": "server1",
"timestamp": 1716668197217,
"events": [
{"type": "cpu", "value": 80},
{"type": "memory", "value": 60},
{"type": "disk", "value": 45}
]
}"#,
)
.unwrap();
let payload = input_value.into();
let result = pipeline
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
// Should produce 3 rows from 1 input
assert_eq!(result.len(), 3);
// Verify each row has correct structure
for (row, _table_suffix) in &result {
assert_eq!(row.values.len(), 4); // host, event_type, event_value, timestamp
// First value should be "server1"
assert_eq!(
row.values[0].value_data,
Some(ValueData::StringValue("server1".to_string()))
);
// Last value should be the timestamp
assert_eq!(
row.values[3].value_data,
Some(ValueData::TimestampMillisecondValue(1716668197217))
);
}
// Verify event types
let event_types: Vec<_> = result
.iter()
.map(|(r, _)| match &r.values[1].value_data {
Some(ValueData::StringValue(s)) => s.clone(),
_ => panic!("expected string"),
})
.collect();
assert!(event_types.contains(&"cpu".to_string()));
assert!(event_types.contains(&"memory".to_string()));
assert!(event_types.contains(&"disk".to_string()));
}
/// Test that single object output still works (backward compatibility)
#[test]
fn test_single_object_output_unchanged() {
let pipeline_yaml = r#"
processors:
- epoch:
field: ts
resolution: ms
- vrl:
source: |
.processed = true
.
transform:
- field: name
type: string
- field: processed
type: boolean
- field: ts
type: timestamp, ms
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let input_value: serde_json::Value = serde_json::from_str(
r#"{
"name": "test",
"ts": 1716668197217
}"#,
)
.unwrap();
let payload = input_value.into();
let result = pipeline
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
// Should produce exactly 1 row
assert_eq!(result.len(), 1);
assert_eq!(
result[0].0.values[0].value_data,
Some(ValueData::StringValue("test".to_string()))
);
assert_eq!(
result[0].0.values[1].value_data,
Some(ValueData::BoolValue(true))
);
}
/// Test that empty array produces zero rows
#[test]
fn test_empty_array_produces_zero_rows() {
let pipeline_yaml = r#"
processors:
- vrl:
source: |
.events
transform:
- field: value
type: int32
- field: greptime_timestamp
type: timestamp, ns
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
let payload = input_value.into();
let result = pipeline
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
// Empty array should produce zero rows
assert_eq!(result.len(), 0);
}
/// Test that array elements must be objects
#[test]
fn test_array_element_must_be_object() {
let pipeline_yaml = r#"
processors:
- vrl:
source: |
.items
transform:
- field: value
type: int32
- field: greptime_timestamp
type: timestamp, ns
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
// Array with non-object elements should fail
let input_value: serde_json::Value =
serde_json::from_str(r#"{"items": [1, 2, 3]}"#).unwrap();
let payload = input_value.into();
let result = pipeline.exec_mut(payload, &pipeline_ctx, &mut schema_info);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("must be an object"),
"Expected error about non-object element, got: {}",
err_msg
);
}
/// Test one-to-many with table suffix from VRL hint
#[test]
fn test_one_to_many_with_table_suffix_hint() {
let pipeline_yaml = r#"
processors:
- epoch:
field: ts
resolution: ms
- vrl:
source: |
.greptime_table_suffix = "_" + string!(.category)
.
transform:
- field: name
type: string
- field: category
type: string
- field: ts
type: timestamp, ms
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let input_value: serde_json::Value = serde_json::from_str(
r#"{
"name": "test",
"category": "metrics",
"ts": 1716668197217
}"#,
)
.unwrap();
let payload = input_value.into();
let result = pipeline
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
// Should have table suffix extracted per row
assert_eq!(result.len(), 1);
assert_eq!(result[0].1, Some("_metrics".to_string()));
}
/// Test one-to-many with per-row table suffix
#[test]
fn test_one_to_many_per_row_table_suffix() {
let pipeline_yaml = r#"
processors:
- epoch:
field: timestamp
resolution: ms
- vrl:
source: |
events = del(.events)
base_ts = del(.timestamp)
map_values(array!(events)) -> |event| {
suffix = "_" + string!(event.category)
{
"name": event.name,
"value": event.value,
"timestamp": base_ts,
"greptime_table_suffix": suffix
}
}
transform:
- field: name
type: string
- field: value
type: int32
- field: timestamp
type: timestamp, ms
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
// Input with events that should go to different tables
let input_value: serde_json::Value = serde_json::from_str(
r#"{
"timestamp": 1716668197217,
"events": [
{"name": "cpu_usage", "value": 80, "category": "cpu"},
{"name": "mem_usage", "value": 60, "category": "memory"},
{"name": "cpu_temp", "value": 45, "category": "cpu"}
]
}"#,
)
.unwrap();
let payload = input_value.into();
let result = pipeline
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
// Should produce 3 rows
assert_eq!(result.len(), 3);
// Collect table suffixes
let table_suffixes: Vec<_> = result.iter().map(|(_, suffix)| suffix.clone()).collect();
// Should have different table suffixes per row
assert!(table_suffixes.contains(&Some("_cpu".to_string())));
assert!(table_suffixes.contains(&Some("_memory".to_string())));
// Count rows per table suffix
let cpu_count = table_suffixes
.iter()
.filter(|s| *s == &Some("_cpu".to_string()))
.count();
let memory_count = table_suffixes
.iter()
.filter(|s| *s == &Some("_memory".to_string()))
.count();
assert_eq!(cpu_count, 2);
assert_eq!(memory_count, 1);
}
/// Test that one-to-many mapping preserves per-row ContextOpt in HashMap
#[test]
fn test_one_to_many_hashmap_contextopt_preservation() {
let pipeline_yaml = r#"
processors:
- epoch:
field: timestamp
resolution: ms
- vrl:
source: |
events = del(.events)
base_ts = del(.timestamp)
map_values(array!(events)) -> |event| {
# Set different TTL values per event type
ttl = if event.type == "critical" {
"1h"
} else if event.type == "warning" {
"24h"
} else {
"7d"
}
{
"host": del(.host),
"event_type": event.type,
"event_value": event.value,
"timestamp": base_ts,
"greptime_ttl": ttl
}
}
transform:
- field: host
type: string
- field: event_type
type: string
- field: event_value
type: int32
- field: timestamp
type: timestamp, ms
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
// Input with events that should have different ContextOpt values
let input_value: serde_json::Value = serde_json::from_str(
r#"{
"host": "server1",
"timestamp": 1716668197217,
"events": [
{"type": "critical", "value": 100},
{"type": "warning", "value": 50},
{"type": "info", "value": 25}
]
}"#,
)
.unwrap();
let payload = input_value.into();
let result = pipeline
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap();
// Extract the HashMap structure
let rows_by_context = result.into_transformed_hashmap().unwrap();
// Should have 3 different ContextOpt groups due to different TTL values
assert_eq!(rows_by_context.len(), 3);
// Verify each ContextOpt group has exactly 1 row and different configurations
let mut context_opts = Vec::new();
for (opt, rows) in &rows_by_context {
assert_eq!(rows.len(), 1); // Each group should have exactly 1 row
context_opts.push(opt.clone());
}
// ContextOpts should be different due to different TTL values
assert_ne!(context_opts[0], context_opts[1]);
assert_ne!(context_opts[1], context_opts[2]);
assert_ne!(context_opts[0], context_opts[2]);
// Verify the rows are correctly structured
for rows in rows_by_context.values() {
for (row, _table_suffix) in rows {
assert_eq!(row.values.len(), 4); // host, event_type, event_value, timestamp
}
}
}
/// Test that single object input still works with HashMap structure
#[test]
fn test_single_object_hashmap_compatibility() {
let pipeline_yaml = r#"
processors:
- epoch:
field: ts
resolution: ms
- vrl:
source: |
.processed = true
.
transform:
- field: name
type: string
- field: processed
type: boolean
- field: ts
type: timestamp, ms
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let input_value: serde_json::Value = serde_json::from_str(
r#"{
"name": "test",
"ts": 1716668197217
}"#,
)
.unwrap();
let payload = input_value.into();
let result = pipeline
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap();
// Extract the HashMap structure
let rows_by_context = result.into_transformed_hashmap().unwrap();
// Single object should produce exactly 1 ContextOpt group
assert_eq!(rows_by_context.len(), 1);
let (_opt, rows) = rows_by_context.into_iter().next().unwrap();
assert_eq!(rows.len(), 1);
// Verify the row structure
let (row, _table_suffix) = &rows[0];
assert_eq!(row.values.len(), 3); // name, processed, timestamp
}
/// Test that empty arrays work correctly with HashMap structure
#[test]
fn test_empty_array_hashmap() {
let pipeline_yaml = r#"
processors:
- vrl:
source: |
.events
transform:
- field: value
type: int32
- field: greptime_timestamp
type: timestamp, ns
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
let payload = input_value.into();
let result = pipeline
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap();
// Extract the HashMap structure
let rows_by_context = result.into_transformed_hashmap().unwrap();
// Empty array should produce empty HashMap
assert_eq!(rows_by_context.len(), 0);
}
}

View File

@@ -57,7 +57,7 @@ const PIPELINE_HINT_PREFIX: &str = "greptime_";
///
/// The options are set in the format of hint keys. See [`PIPELINE_HINT_KEYS`].
/// It's is used as the key in [`ContextReq`] for grouping the row insert requests.
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub struct ContextOpt {
// table options, that need to be set in the query context before making row insert requests
auto_create_table: Option<String>,
@@ -192,8 +192,15 @@ impl ContextReq {
Self { req: req_map }
}
pub fn add_row(&mut self, opt: ContextOpt, req: RowInsertRequest) {
self.req.entry(opt).or_default().push(req);
pub fn add_row(&mut self, opt: &ContextOpt, req: RowInsertRequest) {
match self.req.get_mut(opt) {
None => {
self.req.insert(opt.clone(), vec![req]);
}
Some(e) => {
e.push(req);
}
}
}
pub fn add_rows(&mut self, opt: ContextOpt, reqs: impl IntoIterator<Item = RowInsertRequest>) {

View File

@@ -15,7 +15,7 @@
use std::collections::BTreeMap;
use chrono_tz::Tz;
use snafu::OptionExt;
use snafu::{OptionExt, ensure};
use vrl::compiler::runtime::Runtime;
use vrl::compiler::{Program, TargetValue, compile};
use vrl::diagnostic::Formatter;
@@ -53,9 +53,15 @@ impl VrlProcessor {
// check if the return value is have regex
let result_def = program.final_type_info().result;
let kind = result_def.kind();
if !kind.is_object() {
return VrlReturnValueSnafu.fail();
}
// Check if the return type could possibly be an object or array.
// We use contains_* methods since VRL type inference may return
// a Kind that represents multiple possible types.
ensure!(
kind.contains_object() || kind.contains_array(),
VrlReturnValueSnafu {
result_kind: kind.clone(),
}
);
check_regex_output(kind)?;
Ok(Self { source, program })
@@ -111,13 +117,7 @@ impl crate::etl::processor::Processor for VrlProcessor {
}
fn exec_mut(&self, val: VrlValue) -> Result<VrlValue> {
let val = self.resolve(val)?;
if let VrlValue::Object(_) = val {
Ok(val)
} else {
VrlRegexValueSnafu.fail()
}
self.resolve(val)
}
}

View File

@@ -37,8 +37,8 @@ use vrl::prelude::{Bytes, VrlValueConvert};
use vrl::value::{KeyString, Value as VrlValue};
use crate::error::{
IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
ArrayElementMustBeObjectSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu,
Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
};
use crate::etl::PipelineDocVersion;
@@ -50,6 +50,9 @@ use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
/// Row with potentially designated table suffix.
pub type RowWithTableSuffix = (Row, Option<String>);
/// fields not in the columns will be discarded
/// to prevent automatic column creation in GreptimeDB
#[derive(Debug, Clone)]
@@ -363,6 +366,73 @@ fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueDat
}
}
/// Converts VRL values to Greptime rows grouped by their ContextOpt.
/// # Returns
/// A HashMap where keys are `ContextOpt` and values are vectors of (row, table_suffix) pairs.
/// Single object input produces one ContextOpt group with one row.
/// Array input groups rows by their per-element ContextOpt values.
///
/// # Errors
/// - `ArrayElementMustBeObject` if an array element is not an object
pub(crate) fn values_to_rows(
schema_info: &mut SchemaInfo,
mut values: VrlValue,
pipeline_ctx: &PipelineContext<'_>,
row: Option<Vec<GreptimeValue>>,
need_calc_ts: bool,
tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
let skip_error = pipeline_ctx.pipeline_param.skip_error();
let VrlValue::Array(arr) = values else {
// Single object: extract ContextOpt and table_suffix
let mut result = std::collections::HashMap::new();
let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
Ok(r) => r,
Err(e) => return if skip_error { Ok(result) } else { Err(e) },
};
let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
Ok(r) => r,
Err(e) => return if skip_error { Ok(result) } else { Err(e) },
};
result.insert(opt, vec![(row, table_suffix)]);
return Ok(result);
};
let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
std::collections::HashMap::new();
for (index, mut value) in arr.into_iter().enumerate() {
if !value.is_object() {
unwrap_or_continue_if_err!(
ArrayElementMustBeObjectSnafu {
index,
actual_type: value.kind_str().to_string(),
}
.fail(),
skip_error
);
}
// Extract ContextOpt and table_suffix for this element
let mut opt = unwrap_or_continue_if_err!(
ContextOpt::from_pipeline_map_to_opt(&mut value),
skip_error
);
let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
let transformed_row = unwrap_or_continue_if_err!(
values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
skip_error
);
rows_by_context
.entry(opt)
.or_default()
.push((transformed_row, table_suffix));
}
Ok(rows_by_context)
}
/// `need_calc_ts` happens in two cases:
/// 1. full greptime_identity
/// 2. auto-transform without transformer
@@ -992,4 +1062,139 @@ mod tests {
assert_eq!(flattened_object, expected);
}
}
use ahash::HashMap as AHashMap;
#[test]
fn test_values_to_rows_skip_error_handling() {
let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
// Case 1: skip_error=true, mixed valid/invalid elements
{
let schema_info = &mut SchemaInfo::default();
let input_array = vec![
// Valid object
serde_json::json!({"name": "Alice", "age": 25}).into(),
// Invalid element (string)
VrlValue::Bytes("invalid_string".into()),
// Valid object
serde_json::json!({"name": "Bob", "age": 30}).into(),
// Invalid element (number)
VrlValue::Integer(42),
// Valid object
serde_json::json!({"name": "Charlie", "age": 35}).into(),
];
let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
"skip_error".to_string(),
"true".to_string(),
)]));
let pipeline_ctx = PipelineContext::new(
&PipelineDefinition::GreptimeIdentityPipeline(None),
&params,
Channel::Unknown,
);
let result = values_to_rows(
schema_info,
VrlValue::Array(input_array),
&pipeline_ctx,
None,
true,
table_suffix_template.as_ref(),
);
// Should succeed and only process valid objects
assert!(result.is_ok());
let rows_by_context = result.unwrap();
// Count total rows across all ContextOpt groups
let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
assert_eq!(total_rows, 3); // Only 3 valid objects
}
// Case 2: skip_error=false, invalid elements present
{
let schema_info = &mut SchemaInfo::default();
let input_array = vec![
serde_json::json!({"name": "Alice", "age": 25}).into(),
VrlValue::Bytes("invalid_string".into()), // This should cause error
];
let params = GreptimePipelineParams::default(); // skip_error = false
let pipeline_ctx = PipelineContext::new(
&PipelineDefinition::GreptimeIdentityPipeline(None),
&params,
Channel::Unknown,
);
let result = values_to_rows(
schema_info,
VrlValue::Array(input_array),
&pipeline_ctx,
None,
true,
table_suffix_template.as_ref(),
);
// Should fail with ArrayElementMustBeObject error
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
}
}
/// Test that values_to_rows correctly groups rows by per-element ContextOpt
#[test]
fn test_values_to_rows_per_element_context_opt() {
let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
let schema_info = &mut SchemaInfo::default();
// Create array with elements having different TTL values (ContextOpt)
let input_array = vec![
serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
];
let params = GreptimePipelineParams::default();
let pipeline_ctx = PipelineContext::new(
&PipelineDefinition::GreptimeIdentityPipeline(None),
&params,
Channel::Unknown,
);
let result = values_to_rows(
schema_info,
VrlValue::Array(input_array),
&pipeline_ctx,
None,
true,
table_suffix_template.as_ref(),
);
assert!(result.is_ok());
let rows_by_context = result.unwrap();
// Should have 2 different ContextOpt groups (1h TTL and 24h TTL)
assert_eq!(rows_by_context.len(), 2);
// Count rows per group
let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
assert_eq!(total_rows, 3);
// Verify that rows are correctly grouped by TTL
let mut ttl_1h_count = 0;
let mut ttl_24h_count = 0;
for rows in rows_by_context.values() {
// ContextOpt doesn't expose ttl directly, but we can count by group size
if rows.len() == 2 {
ttl_1h_count = rows.len();
} else if rows.len() == 1 {
ttl_24h_count = rows.len();
}
}
assert_eq!(ttl_1h_count, 2); // Alice and Bob with 1h TTL
assert_eq!(ttl_24h_count, 1); // Charlie with 24h TTL
}
}

View File

@@ -35,21 +35,25 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
match input_value {
VrlValue::Array(array) => {
for value in array {
let row = pipeline
let rows_with_suffix = pipeline
.exec_mut(value, &pipeline_ctx, &mut schema_info)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");
rows.push(row.0);
for (r, _) in rows_with_suffix {
rows.push(r);
}
}
}
VrlValue::Object(_) => {
let row = pipeline
let rows_with_suffix = pipeline
.exec_mut(input_value, &pipeline_ctx, &mut schema_info)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");
rows.push(row.0);
for (r, _) in rows_with_suffix {
rows.push(r);
}
}
_ => {
panic!("invalid input value");

View File

@@ -427,7 +427,7 @@ transform:
);
let stats = input_value.into();
let row = pipeline
let rows_with_suffix = pipeline
.exec_mut(stats, &pipeline_ctx, &mut schema_info)
.expect("failed to exec pipeline")
.into_transformed()
@@ -435,7 +435,7 @@ transform:
let output = Rows {
schema: pipeline.schemas().unwrap().clone(),
rows: vec![row.0],
rows: rows_with_suffix.into_iter().map(|(r, _)| r).collect(),
};
assert_eq!(output.rows.len(), 1);
@@ -501,13 +501,13 @@ transform:
);
let status = input_value.into();
let row = pipeline
let mut rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
let (row, _) = rows_with_suffix.swap_remove(0);
let r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -616,15 +616,16 @@ transform:
);
let status = input_value.into();
let row = pipeline
let mut rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
let (row, _) = rows_with_suffix.swap_remove(0);
let r = row
.0
.values
.clone()
.into_iter()
.map(|v| v.value_data.unwrap())
.collect::<Vec<_>>();
@@ -688,13 +689,13 @@ transform:
);
let status = input_value.into();
let row = pipeline
let mut rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
let (row, _) = rows_with_suffix.swap_remove(0);
let r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -734,14 +735,14 @@ transform:
);
let status = input_value.into();
let row = pipeline
let mut rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
let (row, _) = rows_with_suffix.swap_remove(0);
let r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -799,14 +800,14 @@ transform:
);
let status = input_value.into();
let row = pipeline
let mut rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
let (row, _) = rows_with_suffix.swap_remove(0);
let mut r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -846,13 +847,14 @@ transform:
);
let status = input_value.into();
let row = pipeline
let mut rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
row.0.values.into_iter().for_each(|v| {
let (row, _) = rows_with_suffix.swap_remove(0);
row.values.into_iter().for_each(|v| {
if let ValueData::TimestampNanosecondValue(v) = v.value_data.unwrap() {
let now = chrono::Utc::now().timestamp_nanos_opt().unwrap();
assert!(now - v < 5_000_000);
@@ -923,13 +925,13 @@ transform:
assert_eq!(dispatched_to.pipeline.unwrap(), "access_log_pipeline");
let status = input_value2.into();
let row = pipeline
let mut rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
let (row, _) = rows_with_suffix.swap_remove(0);
let r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -988,8 +990,8 @@ table_suffix: _${logger}
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap();
let (row, table_name) = exec_re.into_transformed().unwrap();
let values = row.values;
let mut rows_with_suffix = exec_re.into_transformed().unwrap();
let (row, table_suffix) = rows_with_suffix.swap_remove(0);
let expected_values = vec![
Value {
value_data: Some(ValueData::StringValue("hello world".into())),
@@ -998,6 +1000,234 @@ table_suffix: _${logger}
value_data: Some(ValueData::TimestampNanosecondValue(1716668197217000000)),
},
];
assert_eq!(expected_values, values);
assert_eq!(table_name, Some("_http".to_string()));
assert_eq!(expected_values, row.values);
assert_eq!(table_suffix, Some("_http".to_string()));
}
/// Test one-to-many pipeline expansion using VRL processor that returns an array
#[test]
fn test_one_to_many_pipeline() {
// Input: single log entry with a list of events
let input_value = serde_json::json!({
"request_id": "req-123",
"events": [
{"type": "click", "value": 100},
{"type": "scroll", "value": 200},
{"type": "submit", "value": 300}
]
});
// VRL processor that expands events into separate rows using map
let pipeline_yaml = r#"
processors:
- vrl:
source: |
events = del(.events)
request_id = del(.request_id)
map_values(array!(events)) -> |event| {
{
"request_id": request_id,
"event_type": event.type,
"event_value": event.value
}
}
transform:
- field: request_id
type: string
- field: event_type
type: string
- field: event_value
type: uint64
"#;
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline");
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = input_value.into();
let rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result");
// Should produce 3 rows from the single input
assert_eq!(rows_with_suffix.len(), 3);
// Row 0: click event
assert_eq!(
rows_with_suffix[0].0.values[0].value_data,
Some(StringValue("req-123".into()))
);
assert_eq!(
rows_with_suffix[0].0.values[1].value_data,
Some(StringValue("click".into()))
);
assert_eq!(
rows_with_suffix[0].0.values[2].value_data,
Some(U64Value(100))
);
// Row 1: scroll event
assert_eq!(
rows_with_suffix[1].0.values[0].value_data,
Some(StringValue("req-123".into()))
);
assert_eq!(
rows_with_suffix[1].0.values[1].value_data,
Some(StringValue("scroll".into()))
);
assert_eq!(
rows_with_suffix[1].0.values[2].value_data,
Some(U64Value(200))
);
// Row 2: submit event
assert_eq!(
rows_with_suffix[2].0.values[0].value_data,
Some(StringValue("req-123".into()))
);
assert_eq!(
rows_with_suffix[2].0.values[1].value_data,
Some(StringValue("submit".into()))
);
assert_eq!(
rows_with_suffix[2].0.values[2].value_data,
Some(U64Value(300))
);
}
/// Test that single object input still works correctly (backward compatibility)
#[test]
fn test_one_to_many_single_object_unchanged() {
let input_value = serde_json::json!({
"name": "Alice",
"age": 30
});
let pipeline_yaml = r#"
processors:
- vrl:
source: |
.processed = true
.
transform:
- field: name
type: string
- field: age
type: uint32
- field: processed
type: boolean
"#;
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline");
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = input_value.into();
let rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result");
// Should produce exactly 1 row
assert_eq!(rows_with_suffix.len(), 1);
let (row, _) = &rows_with_suffix[0];
assert_eq!(row.values[0].value_data, Some(StringValue("Alice".into())));
assert_eq!(row.values[1].value_data, Some(U32Value(30)));
assert_eq!(row.values[2].value_data, Some(BoolValue(true)));
}
/// Test error handling when array contains non-object elements
#[test]
fn test_one_to_many_array_element_validation() {
let input_value = serde_json::json!({
"items": ["string", 123, true]
});
// VRL that returns an array with non-object elements
let pipeline_yaml = r#"
processors:
- vrl:
source: |
.items
transform:
- field: value
type: string
"#;
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline");
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = input_value.into();
let result = pipeline.exec_mut(status, &pipeline_ctx, &mut schema_info);
// Should fail because array elements are not objects
assert!(result.is_err());
let err = result.unwrap_err();
let err_msg = err.to_string();
assert!(
err_msg.contains("must be an object"),
"Expected 'must be an object' error, got: {}",
err_msg
);
}
/// Test that empty array produces zero rows
#[test]
fn test_one_to_many_empty_array() {
let input_value = serde_json::json!({
"events": []
});
let pipeline_yaml = r#"
processors:
- vrl:
source: |
.events
transform:
- field: value
type: string
"#;
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline");
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = input_value.into();
let rows_with_suffix = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result");
// Empty array should produce zero rows
assert_eq!(rows_with_suffix.len(), 0);
}

View File

@@ -16,9 +16,8 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use api::greptime_proto;
use api::v1::helper::time_index_column_schema;
use api::v1::{ColumnDataType, RowInsertRequest, Rows};
use api::v1::{ColumnDataType, RowInsertRequest, Rows, Value};
use common_time::timestamp::TimeUnit;
use pipeline::{
ContextReq, DispatchedTo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, Pipeline, PipelineContext,
@@ -154,13 +153,18 @@ async fn run_custom_pipeline(
let r = unwrap_or_continue_if_err!(result, skip_error);
match r {
PipelineExecOutput::Transformed(TransformedOutput {
opt,
row,
table_suffix,
}) => {
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
PipelineExecOutput::Transformed(TransformedOutput { rows_by_context }) => {
// Process each ContextOpt group separately
for (opt, rows_with_suffix) in rows_by_context {
// Group rows by table name within each context
for (row, table_suffix) in rows_with_suffix {
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
transformed_map
.entry((opt.clone(), act_table_name))
.or_insert_with(|| Vec::with_capacity(arr_len))
.push(row);
}
}
}
PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
push_to_map!(dispatched, dispatched_to, val, arr_len);
@@ -173,22 +177,26 @@ async fn run_custom_pipeline(
let mut results = ContextReq::default();
let s_len = schema_info.schema.len();
// if transformed
// Process transformed outputs. Each entry in transformed_map contains
// Vec<Row> grouped by (opt, table_name).
let column_count = schema_info.schema.len();
for ((opt, table_name), mut rows) in transformed_map {
for row in rows.iter_mut() {
row.values
.resize(s_len, greptime_proto::v1::Value::default());
// Pad rows to match final schema size (schema may have evolved during processing)
for row in &mut rows {
let diff = column_count.saturating_sub(row.values.len());
for _ in 0..diff {
row.values.push(Value { value_data: None });
}
}
results.add_row(
opt,
&opt,
RowInsertRequest {
rows: Some(Rows {
rows,
schema: schema_info.schema.clone(),
}),
table_name,
table_name: table_name.clone(),
},
);
}

View File

@@ -122,6 +122,7 @@ macro_rules! http_tests {
test_pipeline_context,
test_pipeline_with_vrl,
test_pipeline_with_hint_vrl,
test_pipeline_one_to_many_vrl,
test_pipeline_2,
test_pipeline_skip_error,
test_pipeline_filter,
@@ -3285,6 +3286,151 @@ transform:
guard.remove_all().await;
}
/// Test one-to-many VRL pipeline expansion.
/// This test verifies that a VRL processor can return an array, which results in
/// multiple output rows from a single input row.
pub async fn test_pipeline_one_to_many_vrl(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(storage_type, "test_pipeline_one_to_many_vrl").await;
let client = TestClient::new(app).await;
// Pipeline that expands events array into multiple rows
let pipeline = r#"
processors:
- date:
field: timestamp
formats:
- "%Y-%m-%d %H:%M:%S"
ignore_missing: true
- vrl:
source: |
# Extract events array and expand each event into a separate row
events = del(.events)
base_host = del(.host)
base_timestamp = del(.timestamp)
# Map each event to a complete row object
map_values(array!(events)) -> |event| {
{
"host": base_host,
"event_type": event.type,
"event_value": event.value,
"timestamp": base_timestamp
}
}
transform:
- field: host
type: string
- field: event_type
type: string
- field: event_value
type: int32
- field: timestamp
type: time
index: timestamp
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/one_to_many")
.header("Content-Type", "application/x-yaml")
.body(pipeline)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data - single input with multiple events
let data_body = r#"
[
{
"host": "server1",
"timestamp": "2024-05-25 20:16:37",
"events": [
{"type": "cpu", "value": 80},
{"type": "memory", "value": 60},
{"type": "disk", "value": 45}
]
}
]
"#;
let res = client
.post("/v1/events/logs?db=public&table=metrics&pipeline_name=one_to_many")
.header("Content-Type", "application/json")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 3. verify: one input row should produce three output rows
validate_data(
"test_pipeline_one_to_many_vrl_count",
&client,
"select count(*) from metrics",
"[[3]]",
)
.await;
// 4. verify the actual data
validate_data(
"test_pipeline_one_to_many_vrl_data",
&client,
"select host, event_type, event_value from metrics order by event_type",
"[[\"server1\",\"cpu\",80],[\"server1\",\"disk\",45],[\"server1\",\"memory\",60]]",
)
.await;
// 5. Test with multiple input rows, each producing multiple output rows
let data_body2 = r#"
[
{
"host": "server2",
"timestamp": "2024-05-25 20:17:00",
"events": [
{"type": "cpu", "value": 90},
{"type": "memory", "value": 70}
]
},
{
"host": "server3",
"timestamp": "2024-05-25 20:18:00",
"events": [
{"type": "cpu", "value": 50}
]
}
]
"#;
let res = client
.post("/v1/events/logs?db=public&table=metrics&pipeline_name=one_to_many")
.header("Content-Type", "application/json")
.body(data_body2)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 6. verify total count: 3 (from first batch) + 2 + 1 = 6 rows
validate_data(
"test_pipeline_one_to_many_vrl_total_count",
&client,
"select count(*) from metrics",
"[[6]]",
)
.await;
// 7. verify rows per host
validate_data(
"test_pipeline_one_to_many_vrl_per_host",
&client,
"select host, count(*) as cnt from metrics group by host order by host",
"[[\"server1\",3],[\"server2\",2],[\"server3\",1]]",
)
.await;
guard.remove_all().await;
}
pub async fn test_pipeline_2(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(storage_type, "test_pipeline_2").await;