feat(pipeline): introduce pipeline doc version 2 for combine-transform (#6360)

* chore: init commit of pipeline doc version v2

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove unused code

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove unused code

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: compatible with v1 to remain field in the map during transform

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: pipeline.exec_mut

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: typo

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change from v2 to 2 in version setting

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-06-21 17:58:36 -07:00
committed by GitHub
parent 15616d0c43
commit 7cd6be41ce
17 changed files with 612 additions and 1090 deletions

View File

@@ -12,13 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::error::Result;
use pipeline::{json_to_map, parse, Content, Pipeline};
use pipeline::{
json_to_map, parse, setup_pipeline, Content, Pipeline, PipelineContext, SchemaInfo,
};
use serde_json::{Deserializer, Value};
fn processor_mut(
pipeline: &Pipeline,
pipeline: Arc<Pipeline>,
pipeline_ctx: &PipelineContext<'_>,
schema_info: &mut SchemaInfo,
input_values: Vec<Value>,
) -> Result<Vec<greptime_proto::v1::Row>> {
let mut result = Vec::with_capacity(input_values.len());
@@ -26,7 +32,7 @@ fn processor_mut(
for v in input_values {
let payload = json_to_map(v).unwrap();
let r = pipeline
.exec_mut(payload)?
.exec_mut(payload, pipeline_ctx, schema_info)?
.into_transformed()
.expect("expect transformed result ");
result.push(r.0);
@@ -235,11 +241,25 @@ fn criterion_benchmark(c: &mut Criterion) {
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
let pipeline = prepare_pipeline();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let mut group = c.benchmark_group("pipeline");
group.sample_size(50);
group.bench_function("processor mut", |b| {
b.iter(|| {
processor_mut(black_box(&pipeline), black_box(input_value.clone())).unwrap();
processor_mut(
black_box(pipeline.clone()),
black_box(&pipeline_ctx),
black_box(&mut schema_info),
black_box(input_value.clone()),
)
.unwrap();
})
});
group.finish();

View File

@@ -235,14 +235,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Processor {processor}: invalid format {s}"))]
DateInvalidFormat {
s: String,
processor: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid Pattern: '{s}'. {detail}"))]
DissectInvalidPattern {
s: String,
@@ -398,10 +390,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Transform {fields:?} type MUST BE set before default {default}"))]
#[snafu(display("Transform fields must be set."))]
TransformFieldMustBeSet {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Transform {fields:?} type MUST BE set."))]
TransformTypeMustBeSet {
fields: String,
default: String,
#[snafu(implicit)]
location: Location,
},
@@ -426,11 +422,17 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Exactly one time-related processor and one timestamp value is required to use auto transform"))]
#[snafu(display("Exactly one time-related processor and one timestamp value is required to use auto transform. `ignore_missing` can not be set to true."))]
AutoTransformOneTimestamp {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid Pipeline doc version number: {}", version))]
InvalidVersionNumber {
version: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Null type not supported"))]
CoerceUnsupportedNullType {
#[snafu(implicit)]
@@ -857,7 +859,6 @@ impl ErrorExt for Error {
| DateParse { .. }
| DateFailedToGetLocalTimezone { .. }
| DateFailedToGetTimestamp { .. }
| DateInvalidFormat { .. }
| DissectInvalidPattern { .. }
| DissectEmptyPattern { .. }
| DissectSplitExceedsInput { .. }
@@ -883,11 +884,13 @@ impl ErrorExt for Error {
| UrlEncodingDecode { .. }
| TransformOnFailureInvalidValue { .. }
| TransformElementMustBeMap { .. }
| TransformFieldMustBeSet { .. }
| TransformTypeMustBeSet { .. }
| TransformColumnNameMustBeUnique { .. }
| TransformMultipleTimestampIndex { .. }
| TransformTimestampIndexCount { .. }
| AutoTransformOneTimestamp { .. }
| InvalidVersionNumber { .. }
| CoerceUnsupportedNullType { .. }
| CoerceUnsupportedNullTypeTo { .. }
| CoerceUnsupportedEpochType { .. }

View File

@@ -21,26 +21,27 @@ pub mod value;
use std::collections::BTreeMap;
use ahash::{HashMap, HashMapExt};
use api::v1::Row;
use common_time::timestamp::TimeUnit;
use itertools::Itertools;
use processor::{Processor, Processors};
use snafu::{ensure, OptionExt, ResultExt};
use transform::Transforms;
use value::Value;
use yaml_rust::YamlLoader;
use yaml_rust::{Yaml, YamlLoader};
use crate::dispatcher::{Dispatcher, Rule};
use crate::error::{
AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
ValueMustBeMapSnafu, YamlLoadSnafu, YamlParseSnafu,
AutoTransformOneTimestampSnafu, Error, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu,
InvalidVersionNumberSnafu, Result, YamlLoadSnafu, YamlParseSnafu,
};
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
use crate::etl::processor::ProcessorKind;
use crate::etl::transform::transformer::greptime::values_to_row;
use crate::tablesuffix::TableSuffixTemplate;
use crate::{ContextOpt, GreptimeTransformer};
use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo};
const DESCRIPTION: &str = "description";
const DOC_VERSION: &str = "version";
const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
const TRANSFORMS: &str = "transforms";
@@ -63,6 +64,8 @@ pub fn parse(input: &Content) -> Result<Pipeline> {
let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
let doc_version = (&doc[DOC_VERSION]).try_into()?;
let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
v.try_into()?
} else {
@@ -82,16 +85,31 @@ pub fn parse(input: &Content) -> Result<Pipeline> {
let cnt = processors
.iter()
.filter_map(|p| match p {
ProcessorKind::Date(d) => Some(d.target_count()),
ProcessorKind::Timestamp(t) => Some(t.target_count()),
ProcessorKind::Epoch(e) => Some(e.target_count()),
ProcessorKind::Date(d) if !d.ignore_missing() => Some(
d.fields
.iter()
.map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
.collect_vec(),
),
ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
e.fields
.iter()
.map(|f| (f.target_or_input_field(), (&e.resolution).into()))
.collect_vec(),
),
_ => None,
})
.sum::<usize>();
ensure!(cnt == 1, AutoTransformOneTimestampSnafu);
None
.flatten()
.collect_vec();
ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
let (ts_name, timeunit) = cnt.first().unwrap();
TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
} else {
Some(GreptimeTransformer::new(transformers)?)
TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
transformers,
&doc_version,
)?)
};
let dispatcher = if !doc[DISPATCHER].is_badvalue() {
@@ -107,6 +125,7 @@ pub fn parse(input: &Content) -> Result<Pipeline> {
};
Ok(Pipeline {
doc_version,
description,
processors,
transformer,
@@ -118,15 +137,70 @@ pub fn parse(input: &Content) -> Result<Pipeline> {
}
}
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
pub enum PipelineDocVersion {
/// 1. All fields meant to be preserved have to explicitly set in the transform section.
/// 2. Or no transform is set, then the auto-transform will be used.
#[default]
V1,
/// A combination of transform and auto-transform.
/// First it goes through the transform section,
/// then use auto-transform to set the rest fields.
///
/// This is useful if you only want to set the index field,
/// and let the normal fields be auto-inferred.
V2,
}
impl TryFrom<&Yaml> for PipelineDocVersion {
type Error = Error;
fn try_from(value: &Yaml) -> Result<Self> {
if value.is_badvalue() || value.is_null() {
return Ok(PipelineDocVersion::V1);
}
let version = match value {
Yaml::String(s) => s
.parse::<i64>()
.map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
Yaml::Integer(i) => *i,
_ => {
return InvalidVersionNumberSnafu {
version: value.as_str().unwrap_or_default().to_string(),
}
.fail();
}
};
match version {
1 => Ok(PipelineDocVersion::V1),
2 => Ok(PipelineDocVersion::V2),
_ => InvalidVersionNumberSnafu {
version: version.to_string(),
}
.fail(),
}
}
}
#[derive(Debug)]
pub struct Pipeline {
doc_version: PipelineDocVersion,
description: Option<String>,
processors: processor::Processors,
dispatcher: Option<Dispatcher>,
transformer: Option<GreptimeTransformer>,
transformer: TransformerMode,
tablesuffix: Option<TableSuffixTemplate>,
}
#[derive(Debug, Clone)]
pub enum TransformerMode {
GreptimeTransformer(GreptimeTransformer),
AutoTransform(String, TimeUnit),
}
/// Where the pipeline executed is dispatched to, with context information
#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
pub struct DispatchedTo {
@@ -154,7 +228,6 @@ impl DispatchedTo {
#[derive(Debug)]
pub enum PipelineExecOutput {
Transformed(TransformedOutput),
AutoTransform(AutoTransformOutput),
DispatchedTo(DispatchedTo, Value),
}
@@ -163,15 +236,6 @@ pub struct TransformedOutput {
pub opt: ContextOpt,
pub row: Row,
pub table_suffix: Option<String>,
pub pipeline_map: Value,
}
#[derive(Debug)]
pub struct AutoTransformOutput {
pub table_suffix: Option<String>,
// ts_column_name -> unit
pub ts_unit_map: HashMap<String, TimeUnit>,
pub pipeline_map: Value,
}
impl PipelineExecOutput {
@@ -232,7 +296,16 @@ pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<Val
}
impl Pipeline {
pub fn exec_mut(&self, mut val: Value) -> Result<PipelineExecOutput> {
fn is_v1(&self) -> bool {
self.doc_version == PipelineDocVersion::V1
}
pub fn exec_mut(
&self,
mut val: Value,
pipeline_ctx: &PipelineContext<'_>,
schema_info: &mut SchemaInfo,
) -> Result<PipelineExecOutput> {
// process
for processor in self.processors.iter() {
val = processor.exec_mut(val)?;
@@ -243,51 +316,62 @@ impl Pipeline {
return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
}
// do transform
if let Some(transformer) = self.transformer() {
let (mut opt, row) = transformer.transform_mut(&mut val)?;
let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
// extract the options first
// this might be a breaking change, for table_suffix is now right after the processors
let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?;
let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
Ok(PipelineExecOutput::Transformed(TransformedOutput {
opt,
row,
table_suffix,
pipeline_map: val,
}))
} else {
// check table suffix var
let table_suffix = val
.remove(TABLE_SUFFIX_KEY)
.map(|f| f.to_str_value())
.or_else(|| self.tablesuffix.as_ref().and_then(|t| t.apply(&val)));
let mut ts_unit_map = HashMap::with_capacity(4);
// get all ts values
for (k, v) in val.as_map_mut().context(ValueMustBeMapSnafu)? {
if let Value::Timestamp(ts) = v {
if !ts_unit_map.contains_key(k) {
ts_unit_map.insert(k.clone(), ts.get_unit());
}
let row = match self.transformer() {
TransformerMode::GreptimeTransformer(greptime_transformer) => {
let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?;
if self.is_v1() {
// v1 dont combine with auto-transform
// so return immediately
return Ok(PipelineExecOutput::Transformed(TransformedOutput {
opt,
row: Row { values },
table_suffix,
}));
}
// continue v2 process, check ts column and set the rest fields with auto-transform
// if transformer presents, then ts has been set
values_to_row(schema_info, val, pipeline_ctx, Some(values))?
}
Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput {
table_suffix,
ts_unit_map,
pipeline_map: val,
}))
}
TransformerMode::AutoTransform(ts_name, time_unit) => {
// infer ts from the context
// we've check that only one timestamp should exist
// Create pipeline context with the found timestamp
let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
IdentityTimeIndex::Epoch(ts_name.to_string(), *time_unit, false),
));
let n_ctx =
PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
values_to_row(schema_info, val, &n_ctx, None)?
}
};
Ok(PipelineExecOutput::Transformed(TransformedOutput {
opt,
row,
table_suffix,
}))
}
pub fn processors(&self) -> &processor::Processors {
&self.processors
}
pub fn transformer(&self) -> Option<&GreptimeTransformer> {
self.transformer.as_ref()
pub fn transformer(&self) -> &TransformerMode {
&self.transformer
}
// the method is for test purpose
pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
self.transformer.as_ref().map(|t| t.schemas())
match &self.transformer {
TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
TransformerMode::AutoTransform(_, _) => None,
}
}
}
@@ -298,8 +382,35 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
.context(IntermediateKeyIndexSnafu { kind, key })
}
/// This macro is test only, do not use it in production.
/// The schema_info cannot be used in auto-transform ts-infer mode for lacking the ts schema.
///
/// Usage:
/// ```rust
/// let (pipeline, schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
/// let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_param, Channel::Unknown);
/// ```
#[macro_export]
macro_rules! setup_pipeline {
($pipeline:expr) => {{
use std::sync::Arc;
use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
let pipeline: Arc<Pipeline> = Arc::new($pipeline);
let schema = pipeline.schemas().unwrap();
let schema_info = SchemaInfo::from_schema_list(schema.clone());
let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
let pipeline_param = GreptimePipelineParams::default();
(pipeline, schema_info, pipeline_def, pipeline_param)
}};
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::Rows;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{self, ColumnDataType, SemanticType};
@@ -311,7 +422,8 @@ mod tests {
let input_value_str = r#"
{
"my_field": "1,2",
"foo": "bar"
"foo": "bar",
"ts": "1"
}
"#;
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
@@ -321,16 +433,30 @@ processors:
- csv:
field: my_field
target_fields: field1, field2
- epoch:
field: ts
resolution: ns
transform:
- field: field1
type: uint32
- field: field2
type: uint32
- field: ts
type: timestamp, ns
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let payload = json_to_map(input_value).unwrap();
let result = pipeline
.exec_mut(payload)
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
@@ -339,7 +465,7 @@ transform:
assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.0.values[2].value_data {
Some(ValueData::TimestampNanosecondValue(v)) => {
assert_ne!(*v, 0);
assert_ne!(v, &0);
}
_ => panic!("expect null value"),
}
@@ -354,7 +480,7 @@ transform:
- message
patterns:
- "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
- timestamp:
- date:
fields:
- ts
formats:
@@ -378,17 +504,31 @@ transform:
type: timestamp, ns
index: time"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
let pipeline = Arc::new(pipeline);
let schema = pipeline.schemas().unwrap();
let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
let pipeline_param = crate::GreptimePipelineParams::default();
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let mut payload = BTreeMap::new();
payload.insert("message".to_string(), Value::String(message));
let payload = Value::Map(payload.into());
let result = pipeline
.exec_mut(payload)
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
let sechema = pipeline.schemas().unwrap();
assert_eq!(sechema.len(), result.0.values.len());
// println!("[DEBUG]schema_info: {:?}", schema_info.schema);
// println!("[DEBUG]re: {:?}", result.0.values);
assert_eq!(schema_info.schema.len(), result.0.values.len());
let test = vec![
(
ColumnDataType::String as i32,
@@ -425,8 +565,10 @@ transform:
Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
),
];
for i in 0..sechema.len() {
let schema = &sechema[i];
// manually set schema
let schema = pipeline.schemas().unwrap();
for i in 0..schema.len() {
let schema = &schema[i];
let value = &result.0.values[i];
assert_eq!(schema.datatype, test[i].0);
assert_eq!(value.value_data, test[i].1);
@@ -438,7 +580,8 @@ transform:
let input_value_str = r#"
{
"my_field": "1,2",
"foo": "bar"
"foo": "bar",
"ts": "1"
}
"#;
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
@@ -449,17 +592,30 @@ transform:
- csv:
field: my_field
target_fields: field1, field2
- epoch:
field: ts
resolution: ns
transform:
- field: field1
type: uint32
- field: field2
type: uint32
- field: ts
type: timestamp, ns
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let payload = json_to_map(input_value).unwrap();
let result = pipeline
.exec_mut(payload)
.exec_mut(payload, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
@@ -467,7 +623,7 @@ transform:
assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.0.values[2].value_data {
Some(ValueData::TimestampNanosecondValue(v)) => {
assert_ne!(*v, 0);
assert_ne!(v, &0);
}
_ => panic!("expect null value"),
}
@@ -488,7 +644,7 @@ transform:
description: Pipeline for Apache Tomcat
processors:
- timestamp:
- date:
field: test_time
transform:
@@ -498,11 +654,22 @@ transform:
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let pipeline = Arc::new(pipeline);
let schema = pipeline.schemas().unwrap();
let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
let pipeline_param = crate::GreptimePipelineParams::default();
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let schema = pipeline.schemas().unwrap().clone();
let result = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(result)
.exec_mut(result, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.unwrap();
@@ -536,6 +703,9 @@ transform:
description: Pipeline for Apache Tomcat
processors:
- epoch:
field: ts
resolution: ns
dispatcher:
field: typename
@@ -549,7 +719,9 @@ dispatcher:
transform:
- field: typename
type: string
- field: ts
type: timestamp, ns
index: time
"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
@@ -580,6 +752,9 @@ transform:
description: Pipeline for Apache Tomcat
processors:
- epoch:
field: ts
resolution: ns
dispatcher:
_field: typename
@@ -593,14 +768,18 @@ dispatcher:
transform:
- field: typename
type: string
- field: ts
type: timestamp, ns
index: time
"#;
let bad_yaml2 = r#"
---
description: Pipeline for Apache Tomcat
processors:
- epoch:
field: ts
resolution: ns
dispatcher:
field: typename
rules:
@@ -613,14 +792,18 @@ dispatcher:
transform:
- field: typename
type: string
- field: ts
type: timestamp, ns
index: time
"#;
let bad_yaml3 = r#"
---
description: Pipeline for Apache Tomcat
processors:
- epoch:
field: ts
resolution: ns
dispatcher:
field: typename
rules:
@@ -633,7 +816,9 @@ dispatcher:
transform:
- field: typename
type: string
- field: ts
type: timestamp, ns
index: time
"#;
let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));

View File

@@ -27,7 +27,6 @@ pub mod letter;
pub mod regex;
pub mod select;
pub mod simple_extract;
pub mod timestamp;
pub mod urlencoding;
pub mod vrl;
@@ -47,7 +46,6 @@ use json_path::JsonPathProcessor;
use letter::LetterProcessor;
use regex::RegexProcessor;
use snafu::{OptionExt, ResultExt};
use timestamp::TimestampProcessor;
use urlencoding::UrlEncodingProcessor;
use crate::error::{
@@ -138,7 +136,6 @@ pub enum ProcessorKind {
Join(JoinProcessor),
Letter(LetterProcessor),
Regex(RegexProcessor),
Timestamp(TimestampProcessor),
UrlEncoding(UrlEncodingProcessor),
Epoch(EpochProcessor),
Date(DateProcessor),
@@ -211,9 +208,6 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
timestamp::PROCESSOR_TIMESTAMP => {
ProcessorKind::Timestamp(TimestampProcessor::try_from(value)?)
}
urlencoding::PROCESSOR_URL_ENCODING => {
ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
}

View File

@@ -148,7 +148,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
/// Reserved for compatibility only
#[derive(Debug, Default)]
pub struct DateProcessor {
fields: Fields,
pub(crate) fields: Fields,
formats: Formats,
timezone: Option<Arc<String>>,
locale: Option<Arc<String>>, // to support locale
@@ -162,10 +162,6 @@ pub struct DateProcessor {
}
impl DateProcessor {
pub(crate) fn target_count(&self) -> usize {
self.fields.len()
}
fn parse(&self, val: &str) -> Result<Timestamp> {
let mut tz = Tz::UTC;
if let Some(timezone) = &self.timezone {

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::timestamp::TimeUnit;
use snafu::{OptionExt, ResultExt};
use crate::error::{
@@ -34,7 +35,7 @@ pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
const RESOLUTION_NAME: &str = "resolution";
#[derive(Debug, Default)]
enum Resolution {
pub(crate) enum Resolution {
Second,
#[default]
Milli,
@@ -56,13 +57,24 @@ impl TryFrom<&str> for Resolution {
}
}
impl From<&Resolution> for TimeUnit {
fn from(resolution: &Resolution) -> Self {
match resolution {
Resolution::Second => TimeUnit::Second,
Resolution::Milli => TimeUnit::Millisecond,
Resolution::Micro => TimeUnit::Microsecond,
Resolution::Nano => TimeUnit::Nanosecond,
}
}
}
/// support string, integer, float, time, epoch
/// deprecated it should be removed in the future
/// Reserved for compatibility only
#[derive(Debug, Default)]
pub struct EpochProcessor {
fields: Fields,
resolution: Resolution,
pub(crate) fields: Fields,
pub(crate) resolution: Resolution,
ignore_missing: bool,
// description
// if
@@ -110,10 +122,6 @@ impl EpochProcessor {
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
}
}
pub(crate) fn target_count(&self) -> usize {
self.fields.len()
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {

View File

@@ -1,418 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime};
use chrono_tz::Tz;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use crate::error::{
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu,
ProcessorUnsupportedValueSnafu, Result,
};
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::time::{
MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
};
use crate::etl::value::{Timestamp, Value};
pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp";
const RESOLUTION_NAME: &str = "resolution";
const FORMATS_NAME: &str = "formats"; // default RFC3339
lazy_static! {
static ref DEFAULT_FORMATS: Vec<(Arc<String>,Tz)> = vec![
// timezone with colon
"%Y-%m-%dT%H:%M:%S%:z",
"%Y-%m-%dT%H:%M:%S%.3f%:z",
"%Y-%m-%dT%H:%M:%S%.6f%:z",
"%Y-%m-%dT%H:%M:%S%.9f%:z",
// timezone without colon
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S%.3f%z",
"%Y-%m-%dT%H:%M:%S%.6f%z",
"%Y-%m-%dT%H:%M:%S%.9f%z",
// without timezone
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S%.3f",
"%Y-%m-%dT%H:%M:%S%.6f",
"%Y-%m-%dT%H:%M:%S%.9f",
]
.iter()
.map(|s| (Arc::new(s.to_string()),Tz::UCT))
.collect();
}
#[derive(Debug, Default)]
enum Resolution {
Second,
#[default]
Milli,
Micro,
Nano,
}
impl TryFrom<&str> for Resolution {
type Error = Error;
fn try_from(s: &str) -> Result<Self> {
match s {
SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
_ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
}
}
}
#[derive(Debug)]
struct Formats(Vec<(Arc<String>, Tz)>);
impl Formats {
fn new(mut formats: Vec<(Arc<String>, Tz)>) -> Self {
formats.sort_by_key(|(key, _)| key.clone());
formats.dedup();
Formats(formats)
}
}
impl Default for Formats {
fn default() -> Self {
Formats(DEFAULT_FORMATS.clone())
}
}
impl std::ops::Deref for Formats {
type Target = Vec<(Arc<String>, Tz)>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// support string, integer, float, time, epoch
#[derive(Debug, Default)]
pub struct TimestampProcessor {
fields: Fields,
formats: Formats,
resolution: Resolution,
ignore_missing: bool,
// description
// if
// ignore_failure
// on_failure
// tag
}
impl TimestampProcessor {
/// try to parse val with timezone first, if failed, parse without timezone
fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
Ok(dt
.timestamp_nanos_opt()
.context(DateFailedToGetTimestampSnafu)?)
} else {
let dt = NaiveDateTime::parse_from_str(val, fmt)
.context(DateParseSnafu { value: val })?
.and_local_timezone(tz)
.single()
.context(DateFailedToGetLocalTimezoneSnafu)?;
Ok(dt
.timestamp_nanos_opt()
.context(DateFailedToGetTimestampSnafu)?)
}
}
fn parse_time_str(&self, val: &str) -> Result<i64> {
for (fmt, tz) in self.formats.iter() {
if let Ok(ns) = Self::try_parse(val, fmt, *tz) {
return Ok(ns);
}
}
ProcessorFailedToParseStringSnafu {
kind: PROCESSOR_TIMESTAMP,
value: val.to_string(),
}
.fail()
}
fn parse(&self, val: &Value) -> Result<Timestamp> {
let t: i64 = match val {
Value::String(s) => {
let t = s.parse::<i64>();
match t {
Ok(t) => t,
Err(_) => {
let ns = self.parse_time_str(s)?;
return Ok(Timestamp::Nanosecond(ns));
}
}
}
Value::Int16(i) => *i as i64,
Value::Int32(i) => *i as i64,
Value::Int64(i) => *i,
Value::Uint8(i) => *i as i64,
Value::Uint16(i) => *i as i64,
Value::Uint32(i) => *i as i64,
Value::Uint64(i) => *i as i64,
Value::Float32(f) => *f as i64,
Value::Float64(f) => *f as i64,
Value::Timestamp(e) => match self.resolution {
Resolution::Second => e.timestamp(),
Resolution::Milli => e.timestamp_millis(),
Resolution::Micro => e.timestamp_micros(),
Resolution::Nano => e.timestamp_nanos(),
},
_ => {
return ProcessorUnsupportedValueSnafu {
processor: PROCESSOR_TIMESTAMP,
val: val.to_string(),
}
.fail();
}
};
match self.resolution {
Resolution::Second => Ok(Timestamp::Second(t)),
Resolution::Milli => Ok(Timestamp::Millisecond(t)),
Resolution::Micro => Ok(Timestamp::Microsecond(t)),
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
}
}
pub(crate) fn target_count(&self) -> usize {
self.fields.len()
}
}
fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {
match yaml.as_vec() {
Some(formats_yaml) => {
let mut formats = Vec::with_capacity(formats_yaml.len());
for v in formats_yaml {
let s = yaml_strings(v, FORMATS_NAME)
.or(yaml_string(v, FORMATS_NAME).map(|s| vec![s]))?;
if s.len() != 1 && s.len() != 2 {
return DateInvalidFormatSnafu {
processor: PROCESSOR_TIMESTAMP,
s: format!("{s:?}"),
}
.fail();
}
let mut iter = s.into_iter();
// safety: unwrap is safe here
let formatter = iter.next().unwrap();
let tz = iter
.next()
.map(|tz| {
tz.parse::<Tz>()
.context(DateParseTimezoneSnafu { value: tz })
})
.unwrap_or(Ok(Tz::UTC))?;
formats.push((Arc::new(formatter), tz));
}
Ok(formats)
}
None => DateInvalidFormatSnafu {
processor: PROCESSOR_TIMESTAMP,
s: format!("{yaml:?}"),
}
.fail(),
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessor {
type Error = Error;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut formats = Formats::default();
let mut resolution = Resolution::default();
let mut ignore_missing = false;
for (k, v) in hash {
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
}
FIELDS_NAME => {
fields = yaml_new_fields(v, FIELDS_NAME)?;
}
FORMATS_NAME => {
let formats_vec = parse_formats(v)?;
formats = Formats::new(formats_vec);
}
RESOLUTION_NAME => {
resolution = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
}
IGNORE_MISSING_NAME => {
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
}
_ => {}
}
}
let processor_builder = TimestampProcessor {
fields,
formats,
resolution,
ignore_missing,
};
Ok(processor_builder)
}
}
impl Processor for TimestampProcessor {
fn kind(&self) -> &str {
PROCESSOR_TIMESTAMP
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(Value::Null) | None => {
if !self.ignore_missing {
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_field(),
}
.fail();
}
}
Some(v) => {
let result = self.parse(v)?;
let output_key = field.target_or_input_field();
val.insert(output_key.to_string(), Value::Timestamp(result))?;
}
}
}
Ok(val)
}
}
#[cfg(test)]
mod tests {
use yaml_rust::YamlLoader;
use super::TimestampProcessor;
use crate::etl::value::{Timestamp, Value};
#[test]
fn test_parse_epoch() {
let processor_yaml_str = r#"fields:
- hello
resolution: s
formats:
- "%Y-%m-%dT%H:%M:%S%:z"
- "%Y-%m-%dT%H:%M:%S%.3f%:z"
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%SZ"
"#;
let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
let timestamp_yaml = yaml.as_hash().unwrap();
let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
let values = [
(
Value::String("1573840000".into()),
Timestamp::Second(1573840000),
),
(Value::Int32(1573840001), Timestamp::Second(1573840001)),
(Value::Uint64(1573840002), Timestamp::Second(1573840002)),
// float32 has a problem expressing the timestamp.
// 1573840003.0_f32 as i64 is 1573840000
//(Value::Float32(1573840003.0), Epoch::Second(1573840003)),
(
Value::String("2019-11-15T17:46:40Z".into()),
Timestamp::Nanosecond(1573840000000000000),
),
];
for (value, result) in values {
let parsed = processor.parse(&value).unwrap();
assert_eq!(parsed, result);
}
let values: Vec<&str> = vec![
"2014-5-17T12:34:56",
"2014-5-17T12:34:56Z",
"2014-5-17T12:34:56+09:30",
"2014-5-17T12:34:56.000+09:30",
"2014-5-17T12:34:56-0930",
"2014-5-17T12:34:56.000-0930",
]
.into_iter()
.collect();
for value in values {
let parsed = processor.parse(&Value::String(value.into()));
assert!(parsed.is_ok());
}
}
#[test]
fn test_parse_with_timezone() {
let processor_yaml_str = r#"fields:
- hello
resolution: s
formats:
- ["%Y-%m-%dT%H:%M:%S%:z", "Asia/Tokyo"]
- ["%Y-%m-%dT%H:%M:%S%.3f%:z", "Asia/Tokyo"]
- ["%Y-%m-%dT%H:%M:%S", "Asia/Tokyo"]
- ["%Y-%m-%dT%H:%M:%SZ", "Asia/Tokyo"]
"#;
let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
let timestamp_yaml = yaml.as_hash().unwrap();
let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
let values: Vec<&str> = vec![
"2014-5-17T12:34:56",
"2014-5-17T12:34:56Z",
"2014-5-17T12:34:56+09:30",
"2014-5-17T12:34:56.000+09:30",
"2014-5-17T12:34:56-0930",
"2014-5-17T12:34:56.000-0930",
]
.into_iter()
.collect();
for value in values {
let parsed = processor.parse(&Value::String(value.into()));
assert!(parsed.is_ok());
}
}
}

View File

@@ -15,11 +15,11 @@
pub mod index;
pub mod transformer;
use snafu::OptionExt;
use snafu::{ensure, OptionExt};
use crate::error::{
Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
TransformFieldMustBeSetSnafu, TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
};
use crate::etl::field::Fields;
use crate::etl::processor::{yaml_bool, yaml_new_field, yaml_new_fields, yaml_string};
@@ -216,25 +216,30 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
_ => {}
}
}
let mut final_default = None;
if let Some(default_value) = default {
match (&type_, &default_value) {
(Value::Null, _) => {
return TransformTypeMustBeSetSnafu {
fields: format!("{:?}", fields),
default: default_value.to_string(),
}
.fail();
}
(_, Value::Null) => {} // if default is not set, then it will be regarded as default null
(_, _) => {
// ensure fields and type
ensure!(!fields.is_empty(), TransformFieldMustBeSetSnafu);
ensure!(
type_ != Value::Null,
TransformTypeMustBeSetSnafu {
fields: format!("{:?}", fields)
}
);
let final_default = if let Some(default_value) = default {
match default_value {
// if default is not set, then it will be regarded as default null
Value::Null => None,
_ => {
let target = type_.parse_str_value(default_value.to_str_value().as_str())?;
final_default = Some(target);
on_failure = Some(OnFailure::Default);
Some(target)
}
}
}
} else {
None
};
let builder = Transform {
fields,
type_,

View File

@@ -42,6 +42,7 @@ use crate::etl::field::{Field, Fields};
use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transforms};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineDocVersion;
use crate::{unwrap_or_continue_if_err, Map, PipelineContext};
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
@@ -160,7 +161,7 @@ impl GreptimeTransformer {
}
impl GreptimeTransformer {
pub fn new(mut transforms: Transforms) -> Result<Self> {
pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
// empty check is done in the caller
let mut column_names_set = HashSet::new();
let mut timestamp_columns = vec![];
@@ -202,34 +203,34 @@ impl GreptimeTransformer {
}
}
match timestamp_columns.len() {
0 => {
let schema = match timestamp_columns.len() {
0 if doc_version == &PipelineDocVersion::V1 => {
// compatible with v1, add a default timestamp column
GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
let schema = GreptimeTransformer::init_schemas(&transforms)?;
Ok(GreptimeTransformer { transforms, schema })
GreptimeTransformer::init_schemas(&transforms)?
}
1 => {
let schema = GreptimeTransformer::init_schemas(&transforms)?;
Ok(GreptimeTransformer { transforms, schema })
1 => GreptimeTransformer::init_schemas(&transforms)?,
count => {
let columns = timestamp_columns.iter().join(", ");
return TransformTimestampIndexCountSnafu { count, columns }.fail();
}
_ => {
let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", ");
let count = timestamp_columns.len();
TransformTimestampIndexCountSnafu { count, columns }.fail()
}
}
};
Ok(GreptimeTransformer { transforms, schema })
}
pub fn transform_mut(&self, pipeline_map: &mut Value) -> Result<(ContextOpt, Row)> {
let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map)?;
pub fn transform_mut(
&self,
pipeline_map: &mut Value,
is_v1: bool,
) -> Result<Vec<GreptimeValue>> {
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
let mut output_index = 0;
for transform in self.transforms.iter() {
for field in transform.fields.iter() {
let index = field.input_field();
match pipeline_map.get(index) {
let column_name = field.input_field();
// let keep us `get` here to be compatible with v1
match pipeline_map.get(column_name) {
Some(v) => {
let value_data = coerce_value(v, transform)?;
// every transform fields has only one output field
@@ -256,9 +257,14 @@ impl GreptimeTransformer {
}
}
output_index += 1;
if !is_v1 {
// remove the column from the pipeline_map
// so that the auto-transform can use the rest fields
pipeline_map.remove(column_name);
}
}
}
Ok((opt, Row { values }))
Ok(values)
}
pub fn transforms(&self) -> &Transforms {
@@ -292,6 +298,17 @@ impl SchemaInfo {
index: HashMap::with_capacity(capacity),
}
}
pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
let mut index = HashMap::new();
for (i, schema) in schema_list.iter().enumerate() {
index.insert(schema.column_name.clone(), i);
}
Self {
schema: schema_list,
index,
}
}
}
fn resolve_schema(
@@ -398,12 +415,14 @@ fn calc_ts(p_ctx: &PipelineContext, values: &Value) -> Result<Option<ValueData>>
}
}
fn values_to_row(
pub(crate) fn values_to_row(
schema_info: &mut SchemaInfo,
values: Value,
pipeline_ctx: &PipelineContext<'_>,
row: Option<Vec<GreptimeValue>>,
) -> Result<Row> {
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
let mut row: Vec<GreptimeValue> =
row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
// calculate timestamp value based on the channel
@@ -411,9 +430,7 @@ fn values_to_row(
row.push(GreptimeValue { value_data: ts });
for _ in 1..schema_info.schema.len() {
row.push(GreptimeValue { value_data: None });
}
row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
// skip ts column
let ts_column_name = custom_ts
@@ -591,7 +608,7 @@ fn identity_pipeline_inner(
skip_error
);
let row = unwrap_or_continue_if_err!(
values_to_row(&mut schema_info, pipeline_map, pipeline_ctx),
values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None),
skip_error
);

View File

@@ -330,6 +330,13 @@ impl Value {
}
}
pub fn as_map(&self) -> Option<&BTreeMap<String, Self>> {
match self {
Value::Map(map) => Some(map),
_ => None,
}
}
pub fn into_map(self) -> Option<BTreeMap<String, Self>> {
match self {
Value::Map(map) => Some(map.values),

View File

@@ -26,8 +26,8 @@ pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::GreptimeTransformer;
pub use etl::value::{Array, Map, Value};
pub use etl::{
json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map,
AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, TransformedOutput,
json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map, Content,
DispatchedTo, Pipeline, PipelineExecOutput, TransformedOutput, TransformerMode,
};
pub use manager::{
pipeline_operator, table, util, IdentityTimeIndex, PipelineContext, PipelineDefinition,

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
use pipeline::{json_to_map, parse, Content, Pipeline};
use pipeline::{json_to_map, parse, setup_pipeline, Content, Pipeline, PipelineContext};
/// test util function to parse and execute pipeline
pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
@@ -22,7 +22,12 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline");
let schema = pipeline.schemas().unwrap().clone();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let mut rows = Vec::new();
@@ -31,7 +36,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
for value in array {
let intermediate_status = json_to_map(value).unwrap();
let row = pipeline
.exec_mut(intermediate_status)
.exec_mut(intermediate_status, &pipeline_ctx, &mut schema_info)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");
@@ -41,7 +46,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
serde_json::Value::Object(_) => {
let intermediate_status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(intermediate_status)
.exec_mut(intermediate_status, &pipeline_ctx, &mut schema_info)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");
@@ -52,7 +57,10 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
}
}
Rows { schema, rows }
Rows {
schema: schema_info.schema.clone(),
rows,
}
}
/// test util function to create column schema

View File

@@ -16,7 +16,7 @@ mod common;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use pipeline::json_to_map;
use pipeline::{json_to_map, setup_pipeline, PipelineContext};
fn make_string_column_schema(name: String) -> greptime_proto::v1::ColumnSchema {
common::make_column_schema(name, ColumnDataType::String, SemanticType::Field)
@@ -274,9 +274,17 @@ transform:
let yaml_content = pipeline::Content::Yaml(pipeline_yaml);
let pipeline: pipeline::Pipeline =
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let result = json_to_map(input_value).unwrap();
let row = pipeline.exec_mut(result);
let row = pipeline.exec_mut(result, &pipeline_ctx, &mut schema_info);
assert!(row.is_err());
assert_eq!(row.err().unwrap().to_string(), "No matching pattern found");

View File

@@ -20,7 +20,7 @@ use greptime_proto::v1::value::ValueData::{
U32Value, U64Value, U8Value,
};
use greptime_proto::v1::Value as GreptimeValue;
use pipeline::{json_to_map, parse, Content, Pipeline};
use pipeline::{json_to_map, parse, setup_pipeline, Content, Pipeline, PipelineContext};
#[test]
fn test_complex_data() {
@@ -419,10 +419,16 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline");
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let stats = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(stats)
.exec_mut(stats, &pipeline_ctx, &mut schema_info)
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");
@@ -487,10 +493,16 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(status)
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -596,10 +608,16 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(status)
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -662,10 +680,16 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(status)
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -702,10 +726,16 @@ transform:
"#;
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(status)
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -761,10 +791,16 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(status)
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -802,10 +838,16 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = json_to_map(input_value).unwrap();
let row = pipeline
.exec_mut(status)
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -864,10 +906,16 @@ transform:
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = json_to_map(input_value1).unwrap();
let dispatched_to = pipeline
.exec_mut(status)
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_dispatched()
.expect("expect dispatched result ");
@@ -876,7 +924,7 @@ transform:
let status = json_to_map(input_value2).unwrap();
let row = pipeline
.exec_mut(status)
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap()
.into_transformed()
.expect("expect transformed result ");
@@ -928,9 +976,17 @@ table_suffix: _${logger}
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
let pipeline_ctx = PipelineContext::new(
&pipeline_def,
&pipeline_param,
session::context::Channel::Unknown,
);
let status = json_to_map(input_value).unwrap();
let exec_re = pipeline.exec_mut(status).unwrap();
let exec_re = pipeline
.exec_mut(status, &pipeline_ctx, &mut schema_info)
.unwrap();
let (row, table_name) = exec_re.into_transformed().unwrap();
let values = row.values;

View File

@@ -1,422 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
const TEST_INPUT: &str = r#"
{
"input_str": "2024-06-27T06:13:36.991Z"
}"#;
const TEST_VALUE: Option<ValueData> =
Some(ValueData::TimestampNanosecondValue(1719468816991000000));
lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"ts".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}
#[test]
fn test_timestamp_parse_date() {
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}
#[test]
fn test_timestamp_multi_formats() {
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}
#[test]
fn test_timestamp_ignore_missing() {
{
let empty_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
ignore_missing: true
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(empty_input, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, None);
}
{
let empty_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- timestamp:
field: input_s
resolution: s
ignore_missing: true
transform:
- fields:
- input_s, ts
type: timestamp, s
"#;
let expected_schema = vec![
common::make_column_schema(
"ts".to_string(),
ColumnDataType::TimestampSecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(empty_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
}
}
#[test]
fn test_timestamp_timezone() {
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_str
formats:
- ["%Y-%m-%dT%H:%M:%S", "Asia/Shanghai"]
- ["%Y-%m-%dT%H:%M:%S%.3fZ", "Asia/Shanghai"]
ignore_missing: true
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampNanosecondValue(1719440016991000000))
);
}
#[test]
fn test_timestamp_parse_epoch() {
let test_input = r#"
{
"input_s": "1722580862",
"input_sec": "1722580862",
"input_second": "1722580862",
"input_ms": "1722580887794",
"input_millisecond": "1722580887794",
"input_milli": "1722580887794",
"input_default": "1722580887794",
"input_us": "1722580905423969",
"input_microsecond": "1722580905423969",
"input_micro": "1722580905423969",
"input_ns": "1722580929863842048",
"input_nanosecond": "1722580929863842048",
"input_nano": "1722580929863842048"
}"#;
let pipeline_yaml = r#"
processors:
- timestamp:
field: input_s
resolution: s
- timestamp:
field: input_sec
resolution: sec
- timestamp:
field: input_second
resolution: second
- timestamp:
field: input_ms
resolution: ms
- timestamp:
field: input_millisecond
resolution: millisecond
- timestamp:
field: input_milli
resolution: milli
- timestamp:
field: input_default
- timestamp:
field: input_us
resolution: us
- timestamp:
field: input_microsecond
resolution: microsecond
- timestamp:
field: input_micro
resolution: micro
- timestamp:
field: input_ns
resolution: ns
- timestamp:
field: input_nanosecond
resolution: nanosecond
- timestamp:
field: input_nano
resolution: nano
transform:
- field: input_s
type: timestamp, s
- field: input_sec
type: timestamp, sec
- field: input_second
type: timestamp, second
- field: input_ms
type: timestamp, ms
- field: input_millisecond
type: timestamp, millisecond
- field: input_milli
type: timestamp, milli
- field: input_default
type: timestamp, milli
- field: input_us
type: timestamp, us
- field: input_microsecond
type: timestamp, microsecond
- field: input_micro
type: timestamp, micro
- field: input_ns
type: timestamp, ns
- field: input_nanosecond
type: timestamp, nanosecond
- field: input_nano
type: timestamp, nano
"#;
fn make_time_field(name: &str, datatype: ColumnDataType) -> ColumnSchema {
common::make_column_schema(name.to_string(), datatype, SemanticType::Field)
}
let expected_schema = vec![
make_time_field("input_s", ColumnDataType::TimestampSecond),
make_time_field("input_sec", ColumnDataType::TimestampSecond),
make_time_field("input_second", ColumnDataType::TimestampSecond),
make_time_field("input_ms", ColumnDataType::TimestampMillisecond),
make_time_field("input_millisecond", ColumnDataType::TimestampMillisecond),
make_time_field("input_milli", ColumnDataType::TimestampMillisecond),
make_time_field("input_default", ColumnDataType::TimestampMillisecond),
make_time_field("input_us", ColumnDataType::TimestampMicrosecond),
make_time_field("input_microsecond", ColumnDataType::TimestampMicrosecond),
make_time_field("input_micro", ColumnDataType::TimestampMicrosecond),
make_time_field("input_ns", ColumnDataType::TimestampNanosecond),
make_time_field("input_nanosecond", ColumnDataType::TimestampNanosecond),
make_time_field("input_nano", ColumnDataType::TimestampNanosecond),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
for i in 0..2 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampSecondValue(1722580862))
);
}
for i in 3..6 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampMillisecondValue(1722580887794))
);
}
for i in 7..9 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampMicrosecondValue(1722580905423969))
);
}
for i in 10..12 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampNanosecondValue(1722580929863842048))
);
}
}
#[test]
fn test_timestamp_default_wrong_resolution() {
// same as test_default_wrong_resolution from epoch tests
let test_input = r#"
{
"input_s": "1722580862",
"input_nano": "1722583122284583936"
}"#;
let pipeline_yaml = r#"
processors:
- timestamp:
field: input_s
resolution: s
- timestamp:
field: input_nano
resolution: ns
transform:
- fields:
- input_s
type: timestamp, ms
- fields:
- input_nano
type: timestamp, ms
"#;
let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampMillisecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1722580862000))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampMillisecondValue(1722583122284))
);
}
#[test]
fn test_timestamp_without_processor() {
let test_input = r#"
{
"input_s": 1722580862,
"input_nano": 1722583122284583936
}"#;
let pipeline_yaml = r#"
transform:
- fields:
- input_s
type: timestamp, s
- fields:
- input_nano
type: timestamp, ns
"#;
let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampSecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampSecondValue(1722580862))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampNanosecondValue(1722583122284583936))
);
}

View File

@@ -16,16 +16,16 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use api::v1::{RowInsertRequest, Rows};
use itertools::Itertools;
use pipeline::error::AutoTransformOneTimestampSnafu;
use api::greptime_proto;
use api::v1::{ColumnDataType, ColumnSchema, RowInsertRequest, Rows, SemanticType};
use common_time::timestamp::TimeUnit;
use pipeline::{
unwrap_or_continue_if_err, AutoTransformOutput, ContextReq, DispatchedTo, IdentityTimeIndex,
Pipeline, PipelineContext, PipelineDefinition, PipelineExecOutput, TransformedOutput, Value,
unwrap_or_continue_if_err, ContextReq, DispatchedTo, Pipeline, PipelineContext,
PipelineDefinition, PipelineExecOutput, SchemaInfo, TransformedOutput, TransformerMode, Value,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};
use session::context::{Channel, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use crate::error::{CatalogSnafu, PipelineSnafu, Result};
use crate::http::event::PipelineIngestRequest;
@@ -118,12 +118,35 @@ async fn run_custom_pipeline(
let arr_len = pipeline_maps.len();
let mut transformed_map = HashMap::new();
let mut dispatched: BTreeMap<DispatchedTo, Vec<Value>> = BTreeMap::new();
let mut auto_map = HashMap::new();
let mut auto_map_ts_keys = HashMap::new();
let mut schema_info = match pipeline.transformer() {
TransformerMode::GreptimeTransformer(greptime_transformer) => {
SchemaInfo::from_schema_list(greptime_transformer.schemas().clone())
}
TransformerMode::AutoTransform(ts_name, timeunit) => {
let timeunit = match timeunit {
TimeUnit::Second => ColumnDataType::TimestampSecond,
TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
};
let mut schema_info = SchemaInfo::default();
schema_info.schema.push(ColumnSchema {
column_name: ts_name.clone(),
datatype: timeunit.into(),
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
});
schema_info
}
};
for pipeline_map in pipeline_maps {
let result = pipeline
.exec_mut(pipeline_map)
.exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
@@ -137,23 +160,10 @@ async fn run_custom_pipeline(
opt,
row,
table_suffix,
pipeline_map: _val,
}) => {
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
}
PipelineExecOutput::AutoTransform(AutoTransformOutput {
table_suffix,
ts_unit_map,
pipeline_map,
}) => {
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len);
auto_map_ts_keys
.entry(act_table_name)
.or_insert_with(HashMap::new)
.extend(ts_unit_map);
}
PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
push_to_map!(dispatched, dispatched_to, val, arr_len);
}
@@ -162,61 +172,24 @@ async fn run_custom_pipeline(
let mut results = ContextReq::default();
if let Some(s) = pipeline.schemas() {
// transformed
let s_len = schema_info.schema.len();
// if current pipeline generates some transformed results, build it as
// `RowInsertRequest` and append to results. If the pipeline doesn't
// have dispatch, this will be only output of the pipeline.
for ((opt, table_name), rows) in transformed_map {
results.add_row(
opt,
RowInsertRequest {
rows: Some(Rows {
rows,
schema: s.clone(),
}),
table_name,
},
);
}
} else {
// auto map
for (table_name, pipeline_maps) in auto_map {
if pipeline_maps.is_empty() {
continue;
}
let ts_unit_map = auto_map_ts_keys
.remove(&table_name)
.context(AutoTransformOneTimestampSnafu)
.context(PipelineSnafu)?;
// only one timestamp key is allowed
// which will be converted to ts index
let (ts_key, unit) = ts_unit_map
.into_iter()
.exactly_one()
.map_err(|_| AutoTransformOneTimestampSnafu.build())
.context(PipelineSnafu)?;
let ident_ts_index = IdentityTimeIndex::Epoch(ts_key.to_string(), unit, false);
let new_def = PipelineDefinition::GreptimeIdentityPipeline(Some(ident_ts_index));
let next_pipeline_ctx =
PipelineContext::new(&new_def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
let reqs = run_identity_pipeline(
handler,
&next_pipeline_ctx,
PipelineIngestRequest {
table: table_name,
values: pipeline_maps,
},
query_ctx,
)
.await?;
results.merge(reqs);
// if transformed
for ((opt, table_name), mut rows) in transformed_map {
for row in rows.iter_mut() {
row.values
.resize(s_len, greptime_proto::v1::Value::default());
}
results.add_row(
opt,
RowInsertRequest {
rows: Some(Rows {
rows,
schema: schema_info.schema.clone(),
}),
table_name,
},
);
}
// if current pipeline contains dispatcher and has several rules, we may

View File

@@ -108,6 +108,7 @@ macro_rules! http_tests {
test_pipeline_context,
test_pipeline_with_vrl,
test_pipeline_with_hint_vrl,
test_pipeline_2,
test_pipeline_skip_error,
test_otlp_metrics,
@@ -2497,6 +2498,87 @@ transform:
guard.remove_all().await;
}
pub async fn test_pipeline_2(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(storage_type, "test_pipeline_2").await;
// handshake
let client = TestClient::new(app).await;
let pipeline = r#"
version: 2
processors:
- date:
field: time
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
transform:
- field: id1
type: int32
index: inverted
- field: time
type: time
index: timestamp
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/root")
.header("Content-Type", "application/x-yaml")
.body(pipeline)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data
let data_body = r#"
[
{
"id1": "123",
"id2": "2436",
"time": "2024-05-25 20:16:37.217"
}
]
"#;
let res = client
.post("/v1/events/logs?db=public&table=d_table&pipeline_name=root")
.header("Content-Type", "application/json")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// CREATE TABLE IF NOT EXISTS "d_table" (
// "id1" INT NULL INVERTED INDEX,
// "time" TIMESTAMP(9) NOT NULL,
// "id2" STRING NULL,
// TIME INDEX ("time")
// )
// ENGINE=mito
// WITH(
// append_mode = 'true'
// )
validate_data(
"test_pipeline_2_schema",
&client,
"show create table d_table",
"[[\"d_table\",\"CREATE TABLE IF NOT EXISTS \\\"d_table\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n \\\"id2\\\" STRING NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]",
)
.await;
validate_data(
"test_pipeline_2_data",
&client,
"select * from d_table",
"[[123,1716668197217000000,\"2436\"]]",
)
.await;
guard.remove_all().await;
}
pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =