feat(pipeline): vrl processor (#6205)

* feat: pipeline recognize hints from exec

* chore: rename and add test

* chore: minor improve

* chore: rename and add comments

* fix: typos

* feat: add initial impl for vrl processor

* chore: update processors to allow vrl process

* feat: pipeline recognize hints from exec

* chore: rename and add test

* chore: minor improve

* chore: rename and add comments

* fix: typos

* chore: remove unnecessory clone fn

* chore: group metrics

* chore: use struct in transform output enum

* test: add test for vrl

* fix: leaked conflicts

* chore: merge branch code & add check in compile

* fix: check condition
This commit is contained in:
shuiyisong
2025-06-06 09:35:19 -07:00
committed by GitHub
parent 0eb9e97f79
commit a2b3ad77df
30 changed files with 1309 additions and 95 deletions

758
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -59,6 +59,7 @@ sql.workspace = true
table.workspace = true
tokio.workspace = true
urlencoding = "2.1"
vrl = "0.24"
yaml-rust = "0.4"
[dev-dependencies]

View File

@@ -24,9 +24,9 @@ fn processor_mut(
let mut result = Vec::with_capacity(input_values.len());
for v in input_values {
let mut payload = json_to_map(v).unwrap();
let payload = json_to_map(v).unwrap();
let r = pipeline
.exec_mut(&mut payload)?
.exec_mut(payload)?
.into_transformed()
.expect("expect transformed result ");
result.push(r.0);

View File

@@ -686,6 +686,54 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to compile VRL, {}", msg))]
CompileVrl {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute VRL, {}", msg))]
ExecuteVrl {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Float is not a number: {}", input_float))]
FloatNaN {
input_float: f64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid timestamp value: {}", input))]
InvalidTimestamp {
input: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert bytes to utf8"))]
BytesToUtf8 {
#[snafu(source)]
error: std::string::FromUtf8Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Please don't use regex in Vrl script"))]
VrlRegexValue {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Vrl script should return `.` in the end"))]
VrlReturnValue {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to cast type, msg: {}", msg))]
CastType {
msg: String,
@@ -866,6 +914,13 @@ impl ErrorExt for Error {
| ReachedMaxNestedLevels { .. }
| RequiredTableSuffixTemplate
| InvalidTableSuffixTemplate { .. }
| CompileVrl { .. }
| ExecuteVrl { .. }
| FloatNaN { .. }
| BytesToUtf8 { .. }
| InvalidTimestamp { .. }
| VrlRegexValue { .. }
| VrlReturnValue { .. }
| PipelineMissing { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -156,7 +156,7 @@ impl DispatchedTo {
pub enum PipelineExecOutput {
Transformed(TransformedOutput),
AutoTransform(AutoTransformOutput),
DispatchedTo(DispatchedTo),
DispatchedTo(DispatchedTo, PipelineMap),
}
#[derive(Debug)]
@@ -164,6 +164,7 @@ pub struct TransformedOutput {
pub opt: String,
pub row: Row,
pub table_suffix: Option<String>,
pub pipeline_map: PipelineMap,
}
#[derive(Debug)]
@@ -171,6 +172,7 @@ pub struct AutoTransformOutput {
pub table_suffix: Option<String>,
// ts_column_name -> unit
pub ts_unit_map: HashMap<String, TimeUnit>,
pub pipeline_map: PipelineMap,
}
impl PipelineExecOutput {
@@ -188,7 +190,7 @@ impl PipelineExecOutput {
// Note: This is a test only function, do not use it in production.
pub fn into_dispatched(self) -> Option<DispatchedTo> {
if let Self::DispatchedTo(d) = self {
if let Self::DispatchedTo(d, _) = self {
Some(d)
} else {
None
@@ -231,30 +233,31 @@ pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<Pip
}
impl Pipeline {
pub fn exec_mut(&self, val: &mut PipelineMap) -> Result<PipelineExecOutput> {
pub fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineExecOutput> {
// process
for processor in self.processors.iter() {
processor.exec_mut(val)?;
val = processor.exec_mut(val)?;
}
// dispatch, fast return if matched
if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(val)) {
return Ok(PipelineExecOutput::DispatchedTo(rule.into()));
if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
}
if let Some(transformer) = self.transformer() {
let (opt, row) = transformer.transform_mut(val)?;
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
let (opt, row) = transformer.transform_mut(&mut val)?;
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(&val));
Ok(PipelineExecOutput::Transformed(TransformedOutput {
opt,
row,
table_suffix,
pipeline_map: val,
}))
} else {
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(&val));
let mut ts_unit_map = HashMap::with_capacity(4);
// get all ts values
for (k, v) in val {
for (k, v) in val.iter() {
if let Value::Timestamp(ts) = v {
if !ts_unit_map.contains_key(k) {
ts_unit_map.insert(k.clone(), ts.get_unit());
@@ -264,6 +267,7 @@ impl Pipeline {
Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput {
table_suffix,
ts_unit_map,
pipeline_map: val,
}))
}
}
@@ -318,9 +322,9 @@ transform:
type: uint32
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let mut payload = json_to_map(input_value).unwrap();
let payload = json_to_map(input_value).unwrap();
let result = pipeline
.exec_mut(&mut payload)
.exec_mut(payload)
.unwrap()
.into_transformed()
.unwrap();
@@ -371,7 +375,7 @@ transform:
let mut payload = PipelineMap::new();
payload.insert("message".to_string(), Value::String(message));
let result = pipeline
.exec_mut(&mut payload)
.exec_mut(payload)
.unwrap()
.into_transformed()
.unwrap();
@@ -446,9 +450,9 @@ transform:
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let mut payload = json_to_map(input_value).unwrap();
let payload = json_to_map(input_value).unwrap();
let result = pipeline
.exec_mut(&mut payload)
.exec_mut(payload)
.unwrap()
.into_transformed()
.unwrap();
@@ -488,10 +492,10 @@ transform:
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let schema = pipeline.schemas().unwrap().clone();
let mut result = json_to_map(input_value).unwrap();
let result = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(&mut result)
.exec_mut(result)
.unwrap()
.into_transformed()
.unwrap();

View File

@@ -29,6 +29,7 @@ pub mod select;
pub mod simple_extract;
pub mod timestamp;
pub mod urlencoding;
pub mod vrl;
use std::str::FromStr;
@@ -58,6 +59,7 @@ use crate::etl::field::{Field, Fields};
use crate::etl::processor::json_parse::JsonParseProcessor;
use crate::etl::processor::select::SelectProcessor;
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
use crate::etl::processor::vrl::VrlProcessor;
use crate::etl::PipelineMap;
const FIELD_NAME: &str = "field";
@@ -123,7 +125,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
fn ignore_missing(&self) -> bool;
/// Execute the processor on a vector which be preprocessed by the pipeline
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()>;
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap>;
}
#[derive(Debug)]
@@ -146,6 +148,7 @@ pub enum ProcessorKind {
Decolorize(DecolorizeProcessor),
Digest(DigestProcessor),
Select(SelectProcessor),
Vrl(VrlProcessor),
}
#[derive(Debug, Default)]
@@ -227,6 +230,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
json_parse::PROCESSOR_JSON_PARSE => {
ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
}
vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};

View File

@@ -249,7 +249,7 @@ impl Processor for CmcdProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let name = field.input_field();
@@ -277,7 +277,7 @@ impl Processor for CmcdProcessor {
}
}
Ok(())
Ok(val)
}
}

View File

@@ -189,7 +189,7 @@ impl Processor for CsvProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let name = field.input_field();
@@ -216,7 +216,7 @@ impl Processor for CsvProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -194,7 +194,7 @@ impl Processor for DateProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -221,7 +221,7 @@ impl Processor for DateProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -102,7 +102,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -122,7 +122,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -201,7 +201,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -221,7 +221,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -601,7 +601,7 @@ impl Processor for DissectProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -629,7 +629,7 @@ impl Processor for DissectProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -163,7 +163,7 @@ impl Processor for EpochProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -183,7 +183,7 @@ impl Processor for EpochProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -118,7 +118,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -138,7 +138,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -95,7 +95,7 @@ impl Processor for JoinProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -123,7 +123,7 @@ impl Processor for JoinProcessor {
}
}
Ok(())
Ok(val)
}
}

View File

@@ -97,7 +97,7 @@ impl Processor for JsonParseProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -117,7 +117,7 @@ impl Processor for JsonParseProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -125,7 +125,7 @@ impl Processor for JsonPathProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -145,7 +145,7 @@ impl Processor for JsonPathProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -126,7 +126,7 @@ impl Processor for LetterProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -154,7 +154,7 @@ impl Processor for LetterProcessor {
}
}
Ok(())
Ok(val)
}
}

View File

@@ -192,7 +192,7 @@ impl Processor for RegexProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
let prefix = field.target_or_input_field();
@@ -220,7 +220,7 @@ impl Processor for RegexProcessor {
}
}
Ok(())
Ok(val)
}
}
#[cfg(test)]

View File

@@ -96,7 +96,7 @@ impl Processor for SelectProcessor {
true
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
match self.select_type {
SelectType::Include => {
let mut include_key_set = HashSet::with_capacity(val.len());
@@ -121,7 +121,7 @@ impl Processor for SelectProcessor {
}
}
Ok(())
Ok(val)
}
}
@@ -142,8 +142,9 @@ mod test {
p.insert("hello".to_string(), Value::String("world".to_string()));
p.insert("hello2".to_string(), Value::String("world2".to_string()));
let result = processor.exec_mut(&mut p);
let result = processor.exec_mut(p);
assert!(result.is_ok());
let p = result.unwrap();
assert_eq!(p.len(), 1);
assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
}
@@ -159,8 +160,9 @@ mod test {
p.insert("hello".to_string(), Value::String("world".to_string()));
p.insert("hello2".to_string(), Value::String("world2".to_string()));
let result = processor.exec_mut(&mut p);
let result = processor.exec_mut(p);
assert!(result.is_ok());
let p = result.unwrap();
assert_eq!(p.len(), 1);
assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
}
@@ -176,8 +178,9 @@ mod test {
p.insert("hello".to_string(), Value::String("world".to_string()));
p.insert("hello2".to_string(), Value::String("world2".to_string()));
let result = processor.exec_mut(&mut p);
let result = processor.exec_mut(p);
assert!(result.is_ok());
let p = result.unwrap();
assert_eq!(p.len(), 1);
assert_eq!(p.get("hello"), None);
assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));

View File

@@ -98,7 +98,7 @@ impl Processor for SimpleExtractProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -118,7 +118,7 @@ impl Processor for SimpleExtractProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -298,7 +298,7 @@ impl Processor for TimestampProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -318,7 +318,7 @@ impl Processor for TimestampProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -126,7 +126,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -153,7 +153,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
}
}
}
Ok(())
Ok(val)
}
}

View File

@@ -0,0 +1,319 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use chrono_tz::Tz;
use snafu::{OptionExt, ResultExt};
use vrl::compiler::runtime::Runtime;
use vrl::compiler::{compile, Program, TargetValue};
use vrl::diagnostic::Formatter;
use vrl::prelude::{Bytes, NotNan, TimeZone};
use vrl::value::{KeyString, Kind, Secrets, Value as VrlValue};
use crate::error::{
BytesToUtf8Snafu, CompileVrlSnafu, Error, ExecuteVrlSnafu, FloatNaNSnafu,
InvalidTimestampSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu, VrlReturnValueSnafu,
};
use crate::etl::processor::yaml_string;
use crate::{PipelineMap, Value as PipelineValue};
pub(crate) const PROCESSOR_VRL: &str = "vrl";
const SOURCE: &str = "source";
#[derive(Debug)]
pub struct VrlProcessor {
source: String,
program: Program,
}
impl VrlProcessor {
pub fn new(source: String) -> Result<Self> {
let fns = vrl::stdlib::all();
let compile_result = compile(&source, &fns).map_err(|e| {
CompileVrlSnafu {
msg: Formatter::new(&source, e).to_string(),
}
.build()
})?;
let program = compile_result.program;
// check if the return value is have regex
let result_def = program.final_type_info().result;
let kind = result_def.kind();
if !kind.is_object() {
return VrlReturnValueSnafu.fail();
}
check_regex_output(kind)?;
Ok(Self { source, program })
}
pub fn resolve(&self, m: PipelineMap) -> Result<PipelineValue> {
let pipeline_vrl = m
.into_iter()
.map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
.collect::<Result<BTreeMap<_, _>>>()?;
let mut target = TargetValue {
value: VrlValue::Object(pipeline_vrl),
metadata: VrlValue::Object(BTreeMap::new()),
secrets: Secrets::default(),
};
let timezone = TimeZone::Named(Tz::UTC);
let mut runtime = Runtime::default();
let re = runtime
.resolve(&mut target, &self.program, &timezone)
.map_err(|e| {
ExecuteVrlSnafu {
msg: e.get_expression_error().to_string(),
}
.build()
})?;
vrl_value_to_pipeline_value(re)
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for VrlProcessor {
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut source = String::new();
for (k, v) in value.iter() {
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
if key == SOURCE {
source = yaml_string(v, SOURCE)?;
}
}
let processor = VrlProcessor::new(source)?;
Ok(processor)
}
}
impl crate::etl::processor::Processor for VrlProcessor {
fn kind(&self) -> &str {
PROCESSOR_VRL
}
fn ignore_missing(&self) -> bool {
true
}
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap> {
let val = self.resolve(val)?;
if let PipelineValue::Map(m) = val {
Ok(m.values)
} else {
VrlRegexValueSnafu.fail()
}
}
}
fn pipeline_value_to_vrl_value(v: PipelineValue) -> Result<VrlValue> {
match v {
PipelineValue::Null => Ok(VrlValue::Null),
PipelineValue::Int8(x) => Ok(VrlValue::Integer(x as i64)),
PipelineValue::Int16(x) => Ok(VrlValue::Integer(x as i64)),
PipelineValue::Int32(x) => Ok(VrlValue::Integer(x as i64)),
PipelineValue::Int64(x) => Ok(VrlValue::Integer(x)),
PipelineValue::Uint8(x) => Ok(VrlValue::Integer(x as i64)),
PipelineValue::Uint16(x) => Ok(VrlValue::Integer(x as i64)),
PipelineValue::Uint32(x) => Ok(VrlValue::Integer(x as i64)),
PipelineValue::Uint64(x) => Ok(VrlValue::Integer(x as i64)),
PipelineValue::Float32(x) => NotNan::new(x as f64)
.map_err(|_| FloatNaNSnafu { input_float: x }.build())
.map(VrlValue::Float),
PipelineValue::Float64(x) => NotNan::new(x)
.map_err(|_| FloatNaNSnafu { input_float: x }.build())
.map(VrlValue::Float),
PipelineValue::Boolean(x) => Ok(VrlValue::Boolean(x)),
PipelineValue::String(x) => Ok(VrlValue::Bytes(Bytes::copy_from_slice(x.as_bytes()))),
PipelineValue::Timestamp(x) => x
.to_datetime()
.context(InvalidTimestampSnafu {
input: x.to_string(),
})
.map(VrlValue::Timestamp),
PipelineValue::Array(array) => Ok(VrlValue::Array(
array
.into_iter()
.map(pipeline_value_to_vrl_value)
.collect::<Result<Vec<_>>>()?,
)),
PipelineValue::Map(m) => {
let values = m
.values
.into_iter()
.map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
.collect::<Result<BTreeMap<_, _>>>()?;
Ok(VrlValue::Object(values))
}
}
}
fn vrl_value_to_pipeline_value(v: VrlValue) -> Result<PipelineValue> {
match v {
VrlValue::Bytes(bytes) => String::from_utf8(bytes.to_vec())
.context(BytesToUtf8Snafu)
.map(PipelineValue::String),
VrlValue::Regex(_) => VrlRegexValueSnafu.fail(),
VrlValue::Integer(x) => Ok(PipelineValue::Int64(x)),
VrlValue::Float(not_nan) => Ok(PipelineValue::Float64(not_nan.into_inner())),
VrlValue::Boolean(b) => Ok(PipelineValue::Boolean(b)),
VrlValue::Timestamp(date_time) => crate::etl::value::Timestamp::from_datetime(date_time)
.context(InvalidTimestampSnafu {
input: date_time.to_string(),
})
.map(PipelineValue::Timestamp),
VrlValue::Object(bm) => {
let b = bm
.into_iter()
.map(|(k, v)| vrl_value_to_pipeline_value(v).map(|v| (k.to_string(), v)))
.collect::<Result<BTreeMap<String, PipelineValue>>>()?;
Ok(PipelineValue::Map(b.into()))
}
VrlValue::Array(values) => {
let a = values
.into_iter()
.map(vrl_value_to_pipeline_value)
.collect::<Result<Vec<_>>>()?;
Ok(PipelineValue::Array(a.into()))
}
VrlValue::Null => Ok(PipelineValue::Null),
}
}
fn check_regex_output(output_kind: &Kind) -> Result<()> {
if output_kind.is_regex() {
return VrlRegexValueSnafu.fail();
}
if let Some(arr) = output_kind.as_array() {
let k = arr.known();
for v in k.values() {
check_regex_output(v)?
}
}
if let Some(obj) = output_kind.as_object() {
let k = obj.known();
for v in k.values() {
check_regex_output(v)?
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::etl::value::Timestamp;
use crate::Map;
#[test]
fn test_vrl() {
let source = r#"
.name.a = .user_info.name
.name.b = .user_info.name
del(.user_info)
.timestamp = now()
.
"#;
let v = VrlProcessor::new(source.to_string());
assert!(v.is_ok());
let v = v.unwrap();
let mut n = PipelineMap::new();
n.insert(
"name".to_string(),
PipelineValue::String("certain_name".to_string()),
);
let mut m = PipelineMap::new();
m.insert(
"user_info".to_string(),
PipelineValue::Map(Map { values: n }),
);
let re = v.resolve(m);
assert!(re.is_ok());
let re = re.unwrap();
assert!(matches!(re, PipelineValue::Map(_)));
assert!(re.get("name").is_some());
let name = re.get("name").unwrap();
assert!(matches!(name.get("a").unwrap(), PipelineValue::String(x) if x == "certain_name"));
assert!(matches!(name.get("b").unwrap(), PipelineValue::String(x) if x == "certain_name"));
assert!(re.get("timestamp").is_some());
let timestamp = re.get("timestamp").unwrap();
assert!(matches!(
timestamp,
PipelineValue::Timestamp(Timestamp::Nanosecond(_))
));
}
#[test]
fn test_yaml_to_vrl() {
let yaml = r#"
processors:
- vrl:
source: |
.name.a = .user_info.name
.name.b = .user_info.name
del(.user_info)
.timestamp = now()
.
"#;
let y = yaml_rust::YamlLoader::load_from_str(yaml).unwrap();
let vrl_processor_yaml = y
.first()
.and_then(|x| x.as_hash())
.and_then(|x| x.get(&yaml_rust::Yaml::String("processors".to_string())))
.and_then(|x| x.as_vec())
.and_then(|x| x.first())
.and_then(|x| x.as_hash())
.and_then(|x| x.get(&yaml_rust::Yaml::String("vrl".to_string())))
.and_then(|x| x.as_hash())
.unwrap();
let vrl = VrlProcessor::try_from(vrl_processor_yaml);
assert!(vrl.is_ok());
let vrl = vrl.unwrap();
assert_eq!(vrl.source, ".name.a = .user_info.name\n.name.b = .user_info.name\ndel(.user_info)\n.timestamp = now()\n.\n");
}
#[test]
fn test_regex() {
let source = r#"
.re = r'(?i)^Hello, World!$'
del(.re)
.re = r'(?i)^Hello, World!$'
.
"#;
let v = VrlProcessor::new(source.to_string());
assert!(v.is_err());
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::{DateTime, Utc};
use common_time::timestamp::TimeUnit;
#[derive(Debug, Clone, PartialEq)]
@@ -104,6 +105,19 @@ impl Timestamp {
Timestamp::Second(_) => TimeUnit::Second,
}
}
pub fn to_datetime(&self) -> Option<DateTime<Utc>> {
match self {
Timestamp::Nanosecond(v) => Some(DateTime::from_timestamp_nanos(*v)),
Timestamp::Microsecond(v) => DateTime::from_timestamp_micros(*v),
Timestamp::Millisecond(v) => DateTime::from_timestamp_millis(*v),
Timestamp::Second(v) => DateTime::from_timestamp(*v, 0),
}
}
pub fn from_datetime(dt: DateTime<Utc>) -> Option<Self> {
dt.timestamp_nanos_opt().map(Timestamp::Nanosecond)
}
}
impl Default for Timestamp {

View File

@@ -29,9 +29,9 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
match input_value {
serde_json::Value::Array(array) => {
for value in array {
let mut intermediate_status = json_to_map(value).unwrap();
let intermediate_status = json_to_map(value).unwrap();
let row = pipeline
.exec_mut(&mut intermediate_status)
.exec_mut(intermediate_status)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");
@@ -39,9 +39,9 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
}
}
serde_json::Value::Object(_) => {
let mut intermediate_status = json_to_map(input_value).unwrap();
let intermediate_status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(&mut intermediate_status)
.exec_mut(intermediate_status)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");

View File

@@ -274,9 +274,9 @@ transform:
let yaml_content = pipeline::Content::Yaml(pipeline_yaml);
let pipeline: pipeline::Pipeline =
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
let mut result = json_to_map(input_value).unwrap();
let result = json_to_map(input_value).unwrap();
let row = pipeline.exec_mut(&mut result);
let row = pipeline.exec_mut(result);
assert!(row.is_err());
assert_eq!(row.err().unwrap().to_string(), "No matching pattern found");

View File

@@ -419,10 +419,10 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline");
let mut stats = json_to_map(input_value).unwrap();
let stats = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(&mut stats)
.exec_mut(stats)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");
@@ -488,9 +488,9 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let mut status = json_to_map(input_value).unwrap();
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(&mut status)
.exec_mut(status)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -597,9 +597,9 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let mut status = json_to_map(input_value).unwrap();
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(&mut status)
.exec_mut(status)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -663,9 +663,9 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let mut status = json_to_map(input_value).unwrap();
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(&mut status)
.exec_mut(status)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -703,10 +703,9 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let mut status = json_to_map(input_value).unwrap();
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(&mut status)
.exec_mut(status)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -763,9 +762,9 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let mut status = json_to_map(input_value).unwrap();
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(&mut status)
.exec_mut(status)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -804,9 +803,9 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let mut status = json_to_map(input_value).unwrap();
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(&mut status)
.exec_mut(status)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -866,18 +865,18 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let mut status = json_to_map(input_value1).unwrap();
let status = json_to_map(input_value1).unwrap();
let dispatched_to = pipeline
.exec_mut(&mut status)
.exec_mut(status)
.unwrap()
.into_dispatched()
.expect("expect dispatched result ");
assert_eq!(dispatched_to.table_suffix, "http");
assert_eq!(dispatched_to.pipeline.unwrap(), "access_log_pipeline");
let mut status = json_to_map(input_value2).unwrap();
let status = json_to_map(input_value2).unwrap();
let row = pipeline
.exec_mut(&mut status)
.exec_mut(status)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -930,8 +929,8 @@ table_suffix: _${logger}
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let mut status = json_to_map(input_value).unwrap();
let exec_re = pipeline.exec_mut(&mut status).unwrap();
let status = json_to_map(input_value).unwrap();
let exec_re = pipeline.exec_mut(status).unwrap();
let (row, table_name) = exec_re.into_transformed().unwrap();
let values = row.values;

View File

@@ -120,9 +120,9 @@ async fn run_custom_pipeline(
let mut auto_map = HashMap::new();
let mut auto_map_ts_keys = HashMap::new();
for mut pipeline_map in pipeline_maps {
for pipeline_map in pipeline_maps {
let r = pipeline
.exec_mut(&mut pipeline_map)
.exec_mut(pipeline_map)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
@@ -135,6 +135,7 @@ async fn run_custom_pipeline(
opt,
row,
table_suffix,
pipeline_map: _val,
}) => {
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
@@ -142,6 +143,7 @@ async fn run_custom_pipeline(
PipelineExecOutput::AutoTransform(AutoTransformOutput {
table_suffix,
ts_unit_map,
pipeline_map,
}) => {
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len);
@@ -150,8 +152,8 @@ async fn run_custom_pipeline(
.or_insert_with(HashMap::new)
.extend(ts_unit_map);
}
PipelineExecOutput::DispatchedTo(dispatched_to) => {
push_to_map!(dispatched, dispatched_to, pipeline_map, arr_len);
PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
push_to_map!(dispatched, dispatched_to, val, arr_len);
}
}
}

View File

@@ -105,6 +105,7 @@ macro_rules! http_tests {
test_pipeline_dispatcher,
test_pipeline_suffix_template,
test_pipeline_context,
test_pipeline_with_vrl,
test_otlp_metrics,
test_otlp_traces_v0,
@@ -2133,6 +2134,74 @@ table_suffix: _${type}
guard.remove_all().await;
}
pub async fn test_pipeline_with_vrl(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(storage_type, "test_pipeline_with_vrl").await;
// handshake
let client = TestClient::new(app).await;
let pipeline = r#"
processors:
- date:
field: time
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
ignore_missing: true
- vrl:
source: |
.log_id = .id
del(.id)
.
transform:
- fields:
- log_id
type: int32
- field: time
type: time
index: timestamp
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/root")
.header("Content-Type", "application/x-yaml")
.body(pipeline)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data
let data_body = r#"
[
{
"id": "2436",
"time": "2024-05-25 20:16:37.217"
}
]
"#;
let res = client
.post("/v1/events/logs?db=public&table=d_table&pipeline_name=root")
.header("Content-Type", "application/json")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
validate_data(
"test_pipeline_with_vrl",
&client,
"select * from d_table",
"[[2436,1716668197217000000]]",
)
.await;
guard.remove_all().await;
}
pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =