perf: Optimizing pipeline performance (#4390)

* chore: improve pipeline performance

* chore: use arc to improve time type

* chore: improve pipeline coerce

* chore: add vec refactor

* chore: add vec pp

* chore: improve pipeline

* inprocess

* chore: set log ingester use new pipeline

* chore: fix some error by pr comment

* chore: fix typo

* chore: use enum_dispatch to simplify code

* chore: some minor fix

* chore: format code

* chore: update by pr comment

* chore: fix typo

* chore: make clippy happy

* chore: fix by pr comment

* chore: remove epoch and date process add new timestamp process

* chore: add more test for pipeline

* chore: restore epoch and date processor

* chore: compatibility issue

* chore: fix by pr comment

* chore: move the evaluation out of the loop

* chore: fix by pr comment

* chore: fix dissect output key filter

* chore: fix transform output greptime value has order error

* chore: keep pipeline transform output order

* chore: revert tests

* chore: simplify pipeline prepare implementation

* chore: add test for timestamp pipelin processor

* chore: make clippy happy

* chore: replace is_some check to match

---------

Co-authored-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
localhost
2024-08-13 19:32:04 +08:00
committed by GitHub
parent 63e1892dc1
commit 202c730363
31 changed files with 3841 additions and 648 deletions

3
Cargo.lock generated
View File

@@ -7768,6 +7768,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
name = "pipeline"
version = "0.9.1"
dependencies = [
"ahash 0.8.11",
"api",
"arrow",
"async-trait",
@@ -7784,6 +7785,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-time",
"criterion 0.4.0",
"crossbeam-utils",
"csv",
"dashmap",
@@ -7793,6 +7795,7 @@ dependencies = [
"datafusion-functions",
"datafusion-physical-expr",
"datatypes",
"enum_dispatch",
"futures",
"greptime-proto",
"itertools 0.10.5",

View File

@@ -10,6 +10,7 @@ license.workspace = true
workspace = true
[dependencies]
ahash = "0.8"
api.workspace = true
arrow.workspace = true
async-trait.workspace = true
@@ -35,6 +36,7 @@ datafusion-expr.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
@@ -57,7 +59,13 @@ yaml-rust = "0.4"
[dev-dependencies]
catalog = { workspace = true, features = ["testing"] }
criterion = { version = "0.4", features = ["html_reports"] }
rayon = "1.0"
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
session = { workspace = true, features = ["testing"] }
[[bench]]
name = "processor"
harness = false
path = "benches/processor.rs"

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,263 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::{parse, Array, Content, GreptimeTransformer, Pipeline, Value as PipelineValue};
use serde_json::{Deserializer, Value};
fn processor_map(
pipeline: &Pipeline<GreptimeTransformer>,
input_values: Vec<Value>,
) -> impl IntoIterator<Item = greptime_proto::v1::Rows> {
let pipeline_data = input_values
.into_iter()
.map(|v| PipelineValue::try_from(v).unwrap())
.collect::<Vec<_>>();
pipeline.exec(PipelineValue::Array(Array {
values: pipeline_data,
}))
}
fn processor_mut(
pipeline: &Pipeline<GreptimeTransformer>,
input_values: Vec<Value>,
) -> impl IntoIterator<Item = Vec<greptime_proto::v1::Row>> {
let mut payload = pipeline.init_intermediate_state();
let mut result = Vec::with_capacity(input_values.len());
for v in input_values {
pipeline.prepare(v, &mut payload)?;
let r = pipeline.exec_mut(&mut payload)?;
result.push(r);
pipeline.reset_intermediate_state(&mut payload);
}
Ok::<Vec<greptime_proto::v1::Row>, String>(result)
}
fn prepare_pipeline() -> Pipeline<GreptimeTransformer> {
let pipeline_yaml = r#"
---
description: Pipeline for Akamai DataStream2 Log
processors:
- urlencoding:
fields:
- breadcrumbs
- UA
- referer
- queryStr
method: decode
ignore_missing: true
- gsub:
field: reqTimeSec
pattern: "\\."
replacement: ""
- epoch:
field: reqTimeSec
resolution: millisecond
ignore_missing: true
- regex:
field: breadcrumbs
patterns:
- "(?<parent>\\[[^\\[]*c=c[^\\]]*\\])"
- "(?<edge>\\[[^\\[]*c=g[^\\]]*\\])"
- "(?<origin>\\[[^\\[]*c=o[^\\]]*\\])"
- "(?<peer>\\[[^\\[]*c=p[^\\]]*\\])"
- "(?<cloud_wrapper>\\[[^\\[]*c=w[^\\]]*\\])"
ignore_missing: true
- regex:
fields:
- breadcrumbs_parent
- breadcrumbs_edge
- breadcrumbs_origin
- breadcrumbs_peer
- breadcrumbs_cloud_wrapper
ignore_missing: true
patterns:
- "a=(?<ip>[^,\\]]+)"
- "b=(?<request_id>[^,\\]]+)"
- "k=(?<request_end_time>[^,\\]]+)"
- "l=(?<turn_around_time>[^,\\]]+)"
- "m=(?<dns_lookup_time>[^,\\]]+)"
- "n=(?<geo>[^,\\]]+)"
- "o=(?<asn>[^,\\]]+)"
- regex:
field: queryStr, cmcd
patterns:
- "(?i)CMCD=//(?<version>[\\d\\.]+)@V/(?<data>.+$)"
ignore_missing: true
- cmcd:
field: cmcd_data, cmcd
ignore_missing: true
transform:
- fields:
- breadcrumbs
- referer
- queryStr, query_str
- customField, custom_field
- reqId, req_id
- city
- state
- country
- securityRules, security_rules
- ewUsageInfo, ew_usage_info
- ewExecutionInfo, ew_execution_info
- errorCode, error_code
- xForwardedFor, x_forwarded_for
- range
- accLang, acc_lang
- reqMethod, req_method
- reqHost, req_host
- proto
- cliIP, cli_ip
- rspContentType, rsp_content_type
- tlsVersion, tls_version
type: string
- fields:
- version
- cacheStatus, cache_status
- lastByte, last_byte
type: uint8
- fields:
- streamId, stream_id
- billingRegion, billing_region
- transferTimeMSec, transfer_time_msec
- turnAroundTimeMSec, turn_around_time_msec
- reqEndTimeMSec, req_end_time_msec
- maxAgeSec, max_age_sec
- reqPort, req_port
- statusCode, status_code
- cp
- dnsLookupTimeMSec, dns_lookup_time_msec
- tlsOverheadTimeMSec, tls_overhead_time_msec
type: uint32
on_failure: ignore
- fields:
- bytes
- rspContentLen, rsp_content_len
- objSize, obj_size
- uncompressedSize, uncompressed_size
- overheadBytes, overhead_bytes
- totalBytes, total_bytes
type: uint64
on_failure: ignore
- fields:
- UA, user_agent
- cookie
- reqPath, req_path
type: string
# index: fulltext
- field: reqTimeSec, req_time_sec
# epoch time is special, the resolution MUST BE specified
type: epoch, ms
index: time
# the following is from cmcd
- fields:
- cmcd_version
- cmcd_cid, cmcd_content_id
- cmcd_nor, cmcd_next_object_requests
- cmcd_nrr, cmcd_next_range_request
- cmcd_ot, cmcd_object_type
- cmcd_sf, cmcd_streaming_format
- cmcd_sid, cmcd_session_id
- cmcd_st, cmcd_stream_type
- cmcd_v
type: string
- fields:
- cmcd_br, cmcd_encoded_bitrate
- cmcd_bl, cmcd_buffer_length
- cmcd_d, cmcd_object_duration
- cmcd_dl, cmcd_deadline
- cmcd_mtp, cmcd_measured_throughput
- cmcd_rtp, cmcd_requested_max_throughput
- cmcd_tb, cmcd_top_bitrate
type: uint64
- fields:
- cmcd_pr, cmcd_playback_rate
type: float64
- fields:
- cmcd_bs, cmcd_buffer_starvation
- cmcd_su, cmcd_startup
type: boolean
# the following is from breadcrumbs
- fields:
- breadcrumbs_parent_ip
- breadcrumbs_parent_request_id
- breadcrumbs_parent_geo
- breadcrumbs_edge_ip
- breadcrumbs_edge_request_id
- breadcrumbs_edge_geo
- breadcrumbs_origin_ip
- breadcrumbs_origin_request_id
- breadcrumbs_origin_geo
- breadcrumbs_peer_ip
- breadcrumbs_peer_request_id
- breadcrumbs_peer_geo
- breadcrumbs_cloud_wrapper_ip
- breadcrumbs_cloud_wrapper_request_id
- breadcrumbs_cloud_wrapper_geo
type: string
- fields:
- breadcrumbs_parent_request_end_time
- breadcrumbs_parent_turn_around_time
- breadcrumbs_parent_dns_lookup_time
- breadcrumbs_parent_asn
- breadcrumbs_edge_request_end_time
- breadcrumbs_edge_turn_around_time
- breadcrumbs_edge_dns_lookup_time
- breadcrumbs_edge_asn
- breadcrumbs_origin_request_end_time
- breadcrumbs_origin_turn_around_time
- breadcrumbs_origin_dns_lookup_time
- breadcrumbs_origin_asn
- breadcrumbs_peer_request_end_time
- breadcrumbs_peer_turn_around_time
- breadcrumbs_peer_dns_lookup_time
- breadcrumbs_peer_asn
- breadcrumbs_cloud_wrapper_request_end_time
- breadcrumbs_cloud_wrapper_turn_around_time
- breadcrumbs_cloud_wrapper_dns_lookup_time
- breadcrumbs_cloud_wrapper_asn
type: uint32
"#;
parse(&Content::Yaml(pipeline_yaml.into())).unwrap()
}
fn criterion_benchmark(c: &mut Criterion) {
let input_value_str = include_str!("./data.log");
let input_value = Deserializer::from_str(input_value_str)
.into_iter::<serde_json::Value>()
.collect::<Result<Vec<_>, _>>()
.unwrap();
let pipeline = prepare_pipeline();
let mut group = c.benchmark_group("pipeline");
group.sample_size(50);
group.bench_function("processor map", |b| {
b.iter(|| processor_map(black_box(&pipeline), black_box(input_value.clone())))
});
group.bench_function("processor mut", |b| {
b.iter(|| processor_mut(black_box(&pipeline), black_box(input_value.clone())))
});
group.finish();
}
// Testing the pipeline's performance in converting Json to Rows
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -19,8 +19,12 @@ pub mod processor;
pub mod transform;
pub mod value;
use itertools::Itertools;
use ahash::{HashMap, HashSet};
use common_telemetry::{debug, warn};
use itertools::{merge, Itertools};
use processor::Processor;
use transform::{Transformer, Transforms};
use value::{Map, Value};
use yaml_rust::YamlLoader;
const DESCRIPTION: &str = "description";
@@ -32,6 +36,75 @@ pub enum Content {
Yaml(String),
}
/// set the index for the processor keys
/// the index is the position of the key in the final intermediate keys
fn set_processor_keys_index(
processors: &mut processor::Processors,
final_intermediate_keys: &Vec<String>,
) -> Result<(), String> {
let final_intermediate_key_index = final_intermediate_keys
.iter()
.enumerate()
.map(|(i, k)| (k.as_str(), i))
.collect::<HashMap<_, _>>();
for processor in processors.iter_mut() {
for field in processor.fields_mut().iter_mut() {
let index = final_intermediate_key_index.get(field.input_field.name.as_str()).ok_or(format!(
"input field {} is not found in intermediate keys: {final_intermediate_keys:?} when set processor keys index",
field.input_field.name
))?;
field.set_input_index(*index);
for (k, v) in field.output_fields_index_mapping.iter_mut() {
let index = final_intermediate_key_index.get(k.as_str());
match index {
Some(index) => {
*v = *index;
}
None => {
warn!(
"output field {k} is not found in intermediate keys: {final_intermediate_keys:?} when set processor keys index"
);
}
}
}
}
}
Ok(())
}
fn set_transform_keys_index(
transforms: &mut Transforms,
final_intermediate_keys: &[String],
output_keys: &[String],
) -> Result<(), String> {
let final_intermediate_key_index = final_intermediate_keys
.iter()
.enumerate()
.map(|(i, k)| (k.as_str(), i))
.collect::<HashMap<_, _>>();
let output_key_index = output_keys
.iter()
.enumerate()
.map(|(i, k)| (k.as_str(), i))
.collect::<HashMap<_, _>>();
for transform in transforms.iter_mut() {
for field in transform.fields.iter_mut() {
let index = final_intermediate_key_index.get(field.input_field.name.as_str()).ok_or(format!(
"input field {} is not found in intermediate keys: {final_intermediate_keys:?} when set transform keys index",
field.input_field.name
))?;
field.set_input_index(*index);
for (k, v) in field.output_fields_index_mapping.iter_mut() {
let index = output_key_index.get(k.as_str()).ok_or(format!(
"output field {k} is not found in output keys: {final_intermediate_keys:?} when set transform keys index"
))?;
*v = *index;
}
}
}
Ok(())
}
pub fn parse<T>(input: &Content) -> Result<Pipeline<T>, String>
where
T: Transformer,
@@ -44,7 +117,7 @@ where
let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
let mut processors = if let Some(v) = doc[PROCESSORS].as_vec() {
v.try_into()?
} else {
processor::Processors::default()
@@ -56,17 +129,78 @@ where
Transforms::default()
};
let mut transformer = T::new(transforms)?;
let transforms = transformer.transforms_mut();
let processors_output_keys = processors.output_keys();
let processors_required_keys = processors.required_keys();
let processors_required_original_keys = processors.required_original_keys();
debug!(
"processors_required_original_keys: {:?}",
processors_required_original_keys
);
debug!("processors_required_keys: {:?}", processors_required_keys);
debug!("processors_output_keys: {:?}", processors_output_keys);
let transforms_required_keys = transforms.required_keys();
let mut tr_keys = Vec::with_capacity(50);
for key in transforms_required_keys.iter() {
if !processors_output_keys.contains(key)
&& !processors_required_original_keys.contains(key)
{
tr_keys.push(key.clone());
}
}
let mut required_keys = processors_required_original_keys.clone();
required_keys.append(&mut tr_keys);
required_keys.sort();
debug!("required_keys: {:?}", required_keys);
// intermediate keys are the keys that all processor and transformer required
let ordered_intermediate_keys: Vec<String> =
merge(processors_required_keys, transforms_required_keys)
.cloned()
.collect::<HashSet<String>>()
.into_iter()
.sorted()
.collect();
let mut final_intermediate_keys = Vec::with_capacity(ordered_intermediate_keys.len());
let mut intermediate_keys_exclude_original =
Vec::with_capacity(ordered_intermediate_keys.len());
for key_name in ordered_intermediate_keys.iter() {
if required_keys.contains(key_name) {
final_intermediate_keys.push(key_name.clone());
} else {
intermediate_keys_exclude_original.push(key_name.clone());
}
}
final_intermediate_keys.extend(intermediate_keys_exclude_original);
let output_keys = transforms.output_keys().clone();
set_processor_keys_index(&mut processors, &final_intermediate_keys)?;
set_transform_keys_index(transforms, &final_intermediate_keys, &output_keys)?;
Ok(Pipeline {
description,
processors,
transformer: T::new(transforms)?,
transformer,
required_keys,
output_keys,
intermediate_keys: final_intermediate_keys,
})
}
Content::Json(_) => unimplemented!(),
}
}
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Pipeline<T>
where
T: Transformer,
@@ -74,6 +208,13 @@ where
description: Option<String>,
processors: processor::Processors,
transformer: T,
/// required keys for the preprocessing from map data from user
/// include all processor required and transformer required keys
required_keys: Vec<String>,
/// all output keys from the transformer
output_keys: Vec<String>,
/// intermediate keys from the processors
intermediate_keys: Vec<String>,
// pub on_failure: processor::Processors,
}
@@ -97,23 +238,263 @@ impl<T> Pipeline<T>
where
T: Transformer,
{
pub fn exec(&self, val: value::Value) -> Result<T::Output, String> {
let mut val = val;
fn exec_map(&self, map: &mut Map) -> Result<(), String> {
let v = map;
for processor in self.processors.iter() {
val = processor.exec(val)?;
processor.exec_map(v)?;
}
Ok(())
}
pub fn exec(&self, mut val: Value) -> Result<T::Output, String> {
let result = match val {
Value::Map(ref mut map) => {
self.exec_map(map)?;
val
}
Value::Array(arr) => arr
.values
.into_iter()
.map(|mut v| match v {
Value::Map(ref mut map) => {
self.exec_map(map)?;
Ok(v)
}
_ => Err(format!("expected a map, but got {}", v)),
})
.collect::<Result<Vec<Value>, String>>()
.map(|values| Value::Array(value::Array { values }))?,
_ => return Err(format!("expected a map or array, but got {}", val)),
};
self.transformer.transform(result)
}
pub fn exec_mut(&self, val: &mut Vec<Value>) -> Result<T::VecOutput, String> {
for processor in self.processors.iter() {
processor.exec_mut(val)?;
}
self.transformer.transform(val)
self.transformer.transform_mut(val)
}
pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<(), String> {
match val {
serde_json::Value::Object(map) => {
let mut search_from = 0;
// because of the key in the json map is ordered
for (payload_key, payload_value) in map.into_iter() {
if search_from >= self.required_keys.len() - 1 {
break;
}
// because of map key is ordered, required_keys is ordered too
if let Some(pos) = self.required_keys[search_from..]
.iter()
.position(|k| k == &payload_key)
{
result[search_from + pos] = payload_value.try_into()?;
// next search from is always after the current key
search_from += pos;
}
}
}
serde_json::Value::String(_) => {
result[0] = val.try_into()?;
}
_ => {
return Err("expect object".to_string());
}
}
Ok(())
}
pub fn init_intermediate_state(&self) -> Vec<Value> {
vec![Value::Null; self.intermediate_keys.len()]
}
pub fn reset_intermediate_state(&self, result: &mut [Value]) {
for i in result {
*i = Value::Null;
}
}
pub fn processors(&self) -> &processor::Processors {
&self.processors
}
pub fn transformer(&self) -> &T {
&self.transformer
}
/// Required fields in user-supplied data
pub fn required_keys(&self) -> &Vec<String> {
&self.required_keys
}
/// All output keys from the pipeline
pub fn output_keys(&self) -> &Vec<String> {
&self.output_keys
}
/// intermediate keys from the processors
pub fn intermediate_keys(&self) -> &Vec<String> {
&self.intermediate_keys
}
pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
self.transformer.schemas()
}
}
#[cfg(test)]
mod tests {
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{self, ColumnDataType, SemanticType};
use crate::etl::transform::GreptimeTransformer;
use crate::etl::{parse, Content, Pipeline};
use crate::Value;
#[test]
fn test_pipeline_prepare() {
let input_value_str = r#"
{
"my_field": "1,2",
"foo": "bar"
}
"#;
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
let pipeline_yaml = r#"
---
description: Pipeline for Apache Tomcat
processors:
- csv:
field: my_field, my_field,field1, field2
transform:
- field: field1
type: uint32
- field: field2
type: uint32
"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(
&["greptime_timestamp", "my_field"].to_vec(),
pipeline.required_keys()
);
assert_eq!(
payload,
vec![
Value::Null,
Value::String("1,2".to_string()),
Value::Null,
Value::Null
]
);
let result = pipeline.exec_mut(&mut payload).unwrap();
assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.values[2].value_data {
Some(ValueData::TimestampNanosecondValue(v)) => {
assert_ne!(*v, 0);
}
_ => panic!("expect null value"),
}
}
#[test]
fn test_dissect_pipeline() {
let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
let pipeline_str = r#"processors:
- dissect:
fields:
- message
patterns:
- "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
- timestamp:
fields:
- ts
formats:
- "%d/%b/%Y:%H:%M:%S %z"
transform:
- fields:
- ip
- username
- method
- path
- proto
type: string
- fields:
- status
type: uint16
- fields:
- bytes
type: uint32
- field: ts
type: timestamp, ns
index: time"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_str.into())).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline
.prepare(serde_json::Value::String(message), &mut payload)
.unwrap();
let result = pipeline.exec_mut(&mut payload).unwrap();
let sechema = pipeline.schemas();
assert_eq!(sechema.len(), result.values.len());
let test = vec![
(
ColumnDataType::String as i32,
Some(ValueData::StringValue("129.37.245.88".into())),
),
(
ColumnDataType::String as i32,
Some(ValueData::StringValue("meln1ks".into())),
),
(
ColumnDataType::String as i32,
Some(ValueData::StringValue("PATCH".into())),
),
(
ColumnDataType::String as i32,
Some(ValueData::StringValue(
"/observability/metrics/production".into(),
)),
),
(
ColumnDataType::String as i32,
Some(ValueData::StringValue("HTTP/1.0".into())),
),
(
ColumnDataType::Uint16 as i32,
Some(ValueData::U16Value(501)),
),
(
ColumnDataType::Uint32 as i32,
Some(ValueData::U32Value(33085)),
),
(
ColumnDataType::TimestampNanosecond as i32,
Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
),
];
for i in 0..sechema.len() {
let schema = &sechema[i];
let value = &result.values[i];
assert_eq!(schema.datatype, test[i].0);
assert_eq!(value.value_data, test[i].1);
}
}
#[test]
fn test_csv_pipeline() {
@@ -131,7 +512,7 @@ description: Pipeline for Apache Tomcat
processors:
- csv:
field: my_field, field1, field2
field: my_field,my_field, field1, field2
transform:
- field: field1
@@ -162,13 +543,13 @@ transform:
description: Pipeline for Apache Tomcat
processors:
- date:
- timestamp:
field: test_time
transform:
- field: test_time
type: time
index: timestamp
type: timestamp, ns
index: time
"#;
let pipeline: Pipeline<GreptimeTransformer> =

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use ahash::{HashSet, HashSetExt};
use itertools::Itertools;
#[derive(Debug, Default, Clone)]
@@ -36,15 +39,15 @@ impl Fields {
return Err("fields must not be empty".to_string());
}
let mut set = std::collections::HashSet::new();
let mut set = HashSet::new();
for f in self.0.iter() {
if set.contains(&f.field) {
if set.contains(&f.input_field.name) {
return Err(format!(
"field name must be unique, but got duplicated: {}",
f.field
f.input_field.name
));
}
set.insert(&f.field);
set.insert(&f.input_field.name);
}
Ok(self)
@@ -66,9 +69,42 @@ impl std::ops::Deref for Fields {
}
}
impl std::ops::DerefMut for Fields {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Debug, Default, Clone)]
pub struct InputFieldInfo {
pub(crate) name: String,
pub(crate) index: usize,
}
impl InputFieldInfo {
pub(crate) fn new(field: impl Into<String>, index: usize) -> Self {
InputFieldInfo {
name: field.into(),
index,
}
}
pub(crate) fn name(field: impl Into<String>) -> Self {
InputFieldInfo {
name: field.into(),
index: 0,
}
}
}
/// Used to represent the input and output fields of a processor or transform.
#[derive(Debug, Default, Clone)]
pub struct Field {
pub field: String,
/// The input field name and index.
pub input_field: InputFieldInfo,
/// The output field name and index mapping.
pub output_fields_index_mapping: BTreeMap<String, usize>,
// rename
pub target_field: Option<String>,
@@ -82,19 +118,39 @@ pub struct Field {
impl Field {
pub(crate) fn new(field: impl Into<String>) -> Self {
Field {
field: field.into(),
input_field: InputFieldInfo::name(field.into()),
output_fields_index_mapping: BTreeMap::new(),
target_field: None,
target_fields: None,
}
}
// column_name in transform
/// target column_name in processor or transform
/// if target_field is None, return input field name
pub(crate) fn get_target_field(&self) -> &str {
self.target_field.as_deref().unwrap_or(&self.field)
self.target_field
.as_deref()
.unwrap_or(&self.input_field.name)
}
pub(crate) fn get_field(&self) -> &str {
&self.field
/// input column_name in processor or transform
pub(crate) fn get_field_name(&self) -> &str {
&self.input_field.name
}
/// set input column index in processor or transform
pub(crate) fn set_input_index(&mut self, index: usize) {
self.input_field.index = index;
}
pub(crate) fn set_output_index(&mut self, key: &str, index: usize) {
if let Some(v) = self.output_fields_index_mapping.get_mut(key) {
*v = index;
}
}
pub(crate) fn insert_output_index(&mut self, key: String, index: usize) {
self.output_fields_index_mapping.insert(key, index);
}
}
@@ -109,14 +165,18 @@ impl std::str::FromStr for Field {
return Err("field is empty".to_string());
}
let target_field = match parts.next() {
let renamed_field = match parts.next() {
Some(s) if !s.trim().is_empty() => Some(s.trim().to_string()),
_ => None,
};
// TODO(qtang): ???? what's this?
// weird design? field: <field>,<target_field>,<target_fields>,<target_fields>....
// and only use in csv processor
let fields: Vec<_> = parts
.filter(|s| !s.trim().is_empty())
.map(|s| s.trim().to_string())
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect();
let target_fields = if fields.is_empty() {
None
@@ -125,8 +185,9 @@ impl std::str::FromStr for Field {
};
Ok(Field {
field,
target_field,
input_field: InputFieldInfo::name(field),
output_fields_index_mapping: BTreeMap::new(),
target_field: renamed_field,
target_fields,
})
}
@@ -135,11 +196,16 @@ impl std::str::FromStr for Field {
impl std::fmt::Display for Field {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match (&self.target_field, &self.target_fields) {
(Some(target_field), None) => write!(f, "{}, {target_field}", self.field),
(Some(target_field), None) => write!(f, "{}, {target_field}", self.input_field.name),
(None, Some(target_fields)) => {
write!(f, "{}, {}", self.field, target_fields.iter().join(","))
write!(
f,
"{}, {}",
self.input_field.name,
target_fields.iter().join(",")
)
}
_ => write!(f, "{}", self.field),
_ => write!(f, "{}", self.input_field.name),
}
}
}
@@ -187,7 +253,7 @@ mod tests {
for (s, field, target_field, target_fields) in cases.into_iter() {
let f: Field = s.parse().unwrap();
assert_eq!(f.get_field(), field, "{s}");
assert_eq!(f.get_field_name(), field, "{s}");
assert_eq!(f.target_field, target_field, "{s}");
assert_eq!(f.target_fields, target_fields, "{s}");
}

View File

@@ -21,24 +21,26 @@ pub mod gsub;
pub mod join;
pub mod letter;
pub mod regex;
pub mod timestamp;
pub mod urlencoding;
use std::sync::Arc;
use cmcd::CMCDProcessor;
use common_telemetry::warn;
use ahash::{HashSet, HashSetExt};
use cmcd::CmcdProcessor;
use csv::CsvProcessor;
use date::DateProcessor;
use dissect::DissectProcessor;
use enum_dispatch::enum_dispatch;
use epoch::EpochProcessor;
use gsub::GsubProcessor;
use itertools::Itertools;
use join::JoinProcessor;
use letter::LetterProcessor;
use regex::RegexProcessor;
use timestamp::TimestampProcessor;
use urlencoding::UrlEncodingProcessor;
use crate::etl::field::{Field, Fields};
use crate::etl::value::{Array, Map, Value};
use crate::etl::value::{Map, Value};
const FIELD_NAME: &str = "field";
const FIELDS_NAME: &str = "fields";
@@ -53,112 +55,171 @@ const SEPARATOR_NAME: &str = "separator";
// const ON_FAILURE_NAME: &str = "on_failure";
// const TAG_NAME: &str = "tag";
/// Processor trait defines the interface for all processors
/// A processor is a transformation that can be applied to a field in a document
/// It can be used to extract, transform, or enrich data
/// Now Processor only have one input field. In the future, we may support multiple input fields.
/// The output of a processor is a map of key-value pairs that will be merged into the document when you use exec_map method.
#[enum_dispatch(ProcessorKind)]
pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
/// Get the processor's fields
/// fields is just the same processor for multiple keys. It is not the case that a processor has multiple inputs
fn fields(&self) -> &Fields;
/// Get the processor's fields mutably
fn fields_mut(&mut self) -> &mut Fields;
/// Get the processor's kind
fn kind(&self) -> &str;
/// Whether to ignore missing
fn ignore_missing(&self) -> bool;
fn ignore_processor_array_failure(&self) -> bool {
true
}
/// processor all output keys
/// if a processor has multiple output keys, it should return all of them
fn output_keys(&self) -> HashSet<String>;
/// default behavior does nothing and returns the input value
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
Ok(Map::one(field.get_field(), val.clone()))
}
/// Execute the processor on a document
/// and return a map of key-value pairs
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String>;
fn exec_map(&self, mut map: Map) -> Result<Value, String> {
for ff @ Field { field, .. } in self.fields().iter() {
match map.get(field) {
/// Execute the processor on a vector which be preprocessed by the pipeline
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String>;
/// Execute the processor on a map
/// and merge the output into the original map
fn exec_map(&self, map: &mut Map) -> Result<(), String> {
for ff @ Field {
input_field: field_info,
..
} in self.fields().iter()
{
match map.get(&field_info.name) {
Some(v) => {
map.extend(self.exec_field(v, ff)?);
}
None if self.ignore_missing() => {}
None => {
return Err(format!(
"{} processor: field '{field}' is required but missing in {map}",
"{} processor: field '{}' is required but missing in {map}",
self.kind(),
field_info.name,
))
}
}
}
Ok(Value::Map(map))
}
fn exec_array(&self, arr: Array) -> Result<Value, String> {
let mut values = vec![];
for val in arr.into_iter() {
match val {
Value::Map(map) => {
values.push(self.exec_map(map)?);
}
Value::String(_) => {
let fields = self.fields();
if fields.len() != 1 {
return Err(format!(
"{} processor: expected fields length 1 when processing line input, but got {}",
self.kind(),
fields.len()
));
}
let field = fields.first().unwrap();
values.push(self.exec_field(&val, field).map(Value::Map)?);
}
_ if self.ignore_processor_array_failure() => {
warn!("expected a map, but got {val}")
}
_ => return Err(format!("expected a map, but got {}", val)),
}
}
Ok(Value::Array(Array { values }))
}
fn exec(&self, val: Value) -> Result<Value, String> {
match val {
Value::Map(map) => self.exec_map(map),
Value::Array(arr) => self.exec_array(arr),
_ => Err(format!("expected a map or array, but got {}", val)),
}
Ok(())
}
}
#[derive(Debug, Default, Clone)]
#[derive(Debug)]
#[enum_dispatch]
pub enum ProcessorKind {
Cmcd(CmcdProcessor),
Csv(CsvProcessor),
Dissect(DissectProcessor),
Gsub(GsubProcessor),
Join(JoinProcessor),
Letter(LetterProcessor),
Regex(RegexProcessor),
Timestamp(TimestampProcessor),
UrlEncoding(UrlEncodingProcessor),
Epoch(EpochProcessor),
Date(DateProcessor),
}
#[derive(Debug, Default)]
pub struct Processors {
pub processors: Vec<Arc<dyn Processor>>,
}
impl Processors {
pub fn new() -> Self {
Processors { processors: vec![] }
}
/// A ordered list of processors
/// The order of processors is important
/// The output of the first processor will be the input of the second processor
pub processors: Vec<ProcessorKind>,
/// all required keys in all processors
pub required_keys: Vec<String>,
/// all required keys in user-supplied data, not pipeline output fields
pub required_original_keys: Vec<String>,
/// all output keys in all processors
pub output_keys: Vec<String>,
}
impl std::ops::Deref for Processors {
type Target = Vec<Arc<dyn Processor>>;
type Target = Vec<ProcessorKind>;
fn deref(&self) -> &Self::Target {
&self.processors
}
}
impl std::ops::DerefMut for Processors {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.processors
}
}
impl Processors {
/// A collection of all the processor's required input fields
pub fn required_keys(&self) -> &Vec<String> {
&self.required_keys
}
/// A collection of all the processor's output fields
pub fn output_keys(&self) -> &Vec<String> {
&self.output_keys
}
/// Required fields in user-supplied data, not pipeline output fields.
pub fn required_original_keys(&self) -> &Vec<String> {
&self.required_original_keys
}
}
impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
type Error = String;
fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self, Self::Error> {
let mut processors = vec![];
let mut all_output_keys = HashSet::with_capacity(50);
let mut all_required_keys = HashSet::with_capacity(50);
let mut all_required_original_keys = HashSet::with_capacity(50);
for doc in vec {
processors.push(parse_processor(doc)?);
let processor = parse_processor(doc)?;
// get all required keys
let processor_required_keys: Vec<String> = processor
.fields()
.iter()
.map(|f| f.input_field.name.clone())
.collect();
for key in &processor_required_keys {
if !all_output_keys.contains(key) {
all_required_original_keys.insert(key.clone());
}
}
all_required_keys.extend(processor_required_keys);
let processor_output_keys = processor.output_keys().into_iter();
all_output_keys.extend(processor_output_keys);
processors.push(processor);
}
Ok(Processors { processors })
let all_required_keys = all_required_keys.into_iter().sorted().collect();
let all_output_keys = all_output_keys.into_iter().sorted().collect();
let all_required_original_keys = all_required_original_keys.into_iter().sorted().collect();
Ok(Processors {
processors,
required_keys: all_required_keys,
output_keys: all_output_keys,
required_original_keys: all_required_original_keys,
})
}
}
fn parse_processor(doc: &yaml_rust::Yaml) -> Result<Arc<dyn Processor>, String> {
fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind, String> {
let map = doc.as_hash().ok_or("processor must be a map".to_string())?;
let key = map
@@ -176,17 +237,22 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<Arc<dyn Processor>, String>
.as_str()
.ok_or("processor key must be a string".to_string())?;
let processor: Arc<dyn Processor> = match str_key {
cmcd::PROCESSOR_CMCD => Arc::new(CMCDProcessor::try_from(value)?),
csv::PROCESSOR_CSV => Arc::new(CsvProcessor::try_from(value)?),
date::PROCESSOR_DATE => Arc::new(DateProcessor::try_from(value)?),
dissect::PROCESSOR_DISSECT => Arc::new(DissectProcessor::try_from(value)?),
epoch::PROCESSOR_EPOCH => Arc::new(EpochProcessor::try_from(value)?),
gsub::PROCESSOR_GSUB => Arc::new(GsubProcessor::try_from(value)?),
join::PROCESSOR_JOIN => Arc::new(JoinProcessor::try_from(value)?),
letter::PROCESSOR_LETTER => Arc::new(LetterProcessor::try_from(value)?),
regex::PROCESSOR_REGEX => Arc::new(RegexProcessor::try_from(value)?),
urlencoding::PROCESSOR_URL_ENCODING => Arc::new(UrlEncodingProcessor::try_from(value)?),
let processor = match str_key {
cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
timestamp::PROCESSOR_TIMESTAMP => {
ProcessorKind::Timestamp(TimestampProcessor::try_from(value)?)
}
urlencoding::PROCESSOR_URL_ENCODING => {
ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
}
_ => return Err(format!("unsupported {} processor", str_key)),
};
@@ -243,3 +309,11 @@ pub(crate) fn yaml_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields, St
pub(crate) fn yaml_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field, String> {
yaml_parse_string(v, field)
}
pub(crate) fn update_one_one_output_keys(fields: &mut Fields) {
for field in fields.iter_mut() {
field
.output_fields_index_mapping
.insert(field.get_target_field().to_string(), 0_usize);
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::HashSet;
use urlencoding::decode;
use crate::etl::field::{Field, Fields};
@@ -41,6 +42,27 @@ const CMCD_KEY_SU: &str = "su"; // Startup, Boolean
const CMCD_KEY_TB: &str = "tb"; // Top bitrate, Integer kbps
const CMCD_KEY_V: &str = "v"; // Version
const CMCD_KEYS: [&str; 18] = [
CMCD_KEY_BR,
CMCD_KEY_BL,
CMCD_KEY_BS,
CMCD_KEY_CID,
CMCD_KEY_D,
CMCD_KEY_DL,
CMCD_KEY_MTP,
CMCD_KEY_NOR,
CMCD_KEY_NRR,
CMCD_KEY_OT,
CMCD_KEY_PR,
CMCD_KEY_RTP,
CMCD_KEY_SF,
CMCD_KEY_SID,
CMCD_KEY_ST,
CMCD_KEY_SU,
CMCD_KEY_TB,
CMCD_KEY_V,
];
/// Common Media Client Data Specification:
/// https://cdn.cta.tech/cta/media/media/resources/standards/pdfs/cta-5004-final.pdf
///
@@ -77,14 +99,15 @@ const CMCD_KEY_V: &str = "v"; // Version
/// 11. The data payload syntax is intended to be compliant with Structured Field Values for HTTP [6].
/// 12. Transport Layer Security SHOULD be used to protect all transmission of CMCD data.
#[derive(Debug, Default)]
pub struct CMCDProcessor {
pub struct CmcdProcessor {
fields: Fields,
ignore_missing: bool,
}
impl CMCDProcessor {
fn with_fields(&mut self, fields: Fields) {
impl CmcdProcessor {
fn with_fields(&mut self, mut fields: Fields) {
Self::update_output_keys(&mut fields);
self.fields = fields;
}
@@ -92,6 +115,10 @@ impl CMCDProcessor {
self.ignore_missing = ignore_missing;
}
fn generate_key(prefix: &str, key: &str) -> String {
format!("{}_{}", prefix, key)
}
fn parse(prefix: &str, s: &str) -> Result<Map, String> {
let mut map = Map::default();
let parts = s.split(',');
@@ -100,7 +127,7 @@ impl CMCDProcessor {
let k = kv.next().ok_or(format!("{part} missing key in {s}"))?;
let v = kv.next();
let key = format!("{prefix}_{k}");
let key = Self::generate_key(prefix, k);
match k {
CMCD_KEY_BS | CMCD_KEY_SU => {
map.insert(key, Value::Boolean(true));
@@ -144,20 +171,27 @@ impl CMCDProcessor {
}
fn process_field(&self, val: &str, field: &Field) -> Result<Map, String> {
let prefix = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let prefix = field.get_target_field();
Self::parse(prefix, val)
}
fn update_output_keys(fields: &mut Fields) {
for field in fields.iter_mut() {
for key in CMCD_KEYS.iter() {
field
.output_fields_index_mapping
.insert(Self::generate_key(field.get_target_field(), key), 0);
}
}
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for CMCDProcessor {
impl TryFrom<&yaml_rust::yaml::Hash> for CmcdProcessor {
type Error = String;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = CMCDProcessor::default();
let mut processor = CmcdProcessor::default();
for (k, v) in value.iter() {
let key = k
@@ -183,7 +217,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for CMCDProcessor {
}
}
impl crate::etl::processor::Processor for CMCDProcessor {
impl crate::etl::processor::Processor for CmcdProcessor {
fn kind(&self) -> &str {
PROCESSOR_CMCD
}
@@ -196,6 +230,27 @@ impl crate::etl::processor::Processor for CMCDProcessor {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.map(|field| {
field
.target_field
.clone()
.unwrap_or_else(|| field.get_field_name().to_string())
})
.flat_map(|keys| {
CMCD_KEYS
.iter()
.map(move |key| format!("{}_{}", keys, *key))
})
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_field(val, field),
@@ -205,15 +260,46 @@ impl crate::etl::processor::Processor for CMCDProcessor {
)),
}
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
match val.get(field.input_field.index) {
Some(Value::String(v)) => {
// TODO(qtang): Let this method use the intermediate state collection directly.
let map = self.process_field(v, field)?;
for (k, v) in map.values.into_iter() {
if let Some(index) = field.output_fields_index_mapping.get(&k) {
val[*index] = v;
}
}
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use ahash::HashMap;
use urlencoding::decode;
use super::CMCDProcessor;
use super::CmcdProcessor;
use crate::etl::value::{Map, Value};
#[test]
@@ -354,7 +440,7 @@ mod tests {
.collect::<HashMap<String, Value>>();
let expected = Map { values };
let actual = CMCDProcessor::parse("prefix", &decoded).unwrap();
let actual = CmcdProcessor::parse("prefix", &decoded).unwrap();
assert_eq!(actual, expected);
}
}

View File

@@ -14,8 +14,7 @@
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html
use std::collections::HashMap;
use ahash::{HashMap, HashSet};
use csv::{ReaderBuilder, Trim};
use itertools::EitherOrBoth::{Both, Left, Right};
use itertools::Itertools;
@@ -121,7 +120,7 @@ impl CsvProcessor {
.as_ref()
.ok_or(format!(
"target fields must be set after '{}'",
field.get_field()
field.get_field_name()
))?
.iter()
.map(|f| f.to_string())
@@ -147,6 +146,18 @@ impl CsvProcessor {
Err("expected at least one record from csv format, but got none".into())
}
}
fn update_output_keys(&mut self) {
self.fields.iter_mut().for_each(|f| {
if let Some(tfs) = f.target_fields.as_ref() {
tfs.iter().for_each(|tf| {
if !tf.is_empty() {
f.output_fields_index_mapping.insert(tf.to_string(), 0);
}
});
}
})
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessor {
@@ -184,7 +195,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessor {
_ => {}
}
}
processor.update_output_keys();
Ok(processor)
}
}
@@ -202,6 +213,17 @@ impl Processor for CsvProcessor {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.flat_map(|f| f.target_fields.clone().unwrap_or_default())
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_field(val, field),
@@ -211,12 +233,44 @@ impl Processor for CsvProcessor {
)),
}
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
match val.get(field.input_field.index) {
Some(Value::String(v)) => {
// TODO(qtang): Let this method use the intermediate state collection directly.
let map = self.process_field(v, field)?;
for (k, v) in map.values.into_iter() {
if let Some(index) = field.output_fields_index_mapping.get(&k) {
val[*index] = v;
}
}
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
}
}
}
Ok(())
}
}
// TODO(yuanbohan): more test cases
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use ahash::HashMap;
use super::{CsvProcessor, Value};
use crate::etl::field::Fields;
@@ -232,8 +286,9 @@ mod tests {
let values: HashMap<String, Value> = [("data".into(), Value::String("1,2".into()))]
.into_iter()
.collect();
let mut m = Map { values };
let result = processor.exec(Value::Map(Map { values })).unwrap();
processor.exec_map(&mut m).unwrap();
let values = [
("data".into(), Value::String("1,2".into())),
@@ -242,9 +297,9 @@ mod tests {
]
.into_iter()
.collect();
let expected = Value::Map(Map { values });
let expected = Map { values };
assert_eq!(expected, result);
assert_eq!(expected, m);
}
// test target_fields length larger than the record length
@@ -253,7 +308,7 @@ mod tests {
let values = [("data".into(), Value::String("1,2".into()))]
.into_iter()
.collect();
let input = Value::Map(Map { values });
let mut input = Map { values };
// with no empty value
{
@@ -261,7 +316,7 @@ mod tests {
let field = "data,, a,b,c".parse().unwrap();
processor.with_fields(Fields::one(field));
let result = processor.exec(input.clone()).unwrap();
processor.exec_map(&mut input).unwrap();
let values = [
("data".into(), Value::String("1,2".into())),
@@ -271,9 +326,9 @@ mod tests {
]
.into_iter()
.collect();
let expected = Value::Map(Map { values });
let expected = Map { values };
assert_eq!(expected, result);
assert_eq!(expected, input);
}
// with empty value
@@ -283,7 +338,7 @@ mod tests {
processor.with_fields(Fields::one(field));
processor.with_empty_value("default".into());
let result = processor.exec(input).unwrap();
processor.exec_map(&mut input).unwrap();
let values = [
("data".into(), Value::String("1,2".into())),
@@ -293,9 +348,9 @@ mod tests {
]
.into_iter()
.collect();
let expected = Value::Map(Map { values });
let expected = Map { values };
assert_eq!(expected, result);
assert_eq!(expected, input);
}
}
@@ -305,13 +360,13 @@ mod tests {
let values = [("data".into(), Value::String("1,2,3".into()))]
.into_iter()
.collect();
let input = Value::Map(Map { values });
let mut input = Map { values };
let mut processor = CsvProcessor::new();
let field = "data,,a,b".parse().unwrap();
processor.with_fields(Fields::one(field));
let result = processor.exec(input).unwrap();
processor.exec_map(&mut input).unwrap();
let values = [
("data".into(), Value::String("1,2,3".into())),
@@ -320,8 +375,8 @@ mod tests {
]
.into_iter()
.collect();
let expected = Value::Map(Map { values });
let expected = Map { values };
assert_eq!(expected, result);
assert_eq!(expected, input);
}
}

View File

@@ -12,16 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use ahash::HashSet;
use chrono::{DateTime, NaiveDateTime};
use chrono_tz::Tz;
use lazy_static::lazy_static;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME,
update_one_one_output_keys, yaml_bool, yaml_field, yaml_fields, yaml_string, yaml_strings,
Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::{Map, Time, Value};
use crate::etl::value::{Map, Timestamp, Value};
pub(crate) const PROCESSOR_DATE: &str = "date";
@@ -31,7 +34,7 @@ const LOCALE_NAME: &str = "locale";
const OUTPUT_FORMAT_NAME: &str = "output_format"; // default with input format
lazy_static! {
static ref DEFAULT_FORMATS: Vec<String> = vec![
static ref DEFAULT_FORMATS: Vec<Arc<String>> = vec![
// timezone with colon
"%Y-%m-%dT%H:%M:%S%:z",
"%Y-%m-%dT%H:%M:%S%.3f%:z",
@@ -50,15 +53,15 @@ lazy_static! {
"%Y-%m-%dT%H:%M:%S%.9f",
]
.iter()
.map(|s| s.to_string())
.map(|s| Arc::new(s.to_string()))
.collect();
}
#[derive(Debug, Default)]
struct Formats(Vec<String>);
struct Formats(Vec<Arc<String>>);
impl Formats {
fn new(mut formats: Vec<String>) -> Self {
fn new(mut formats: Vec<Arc<String>>) -> Self {
formats.sort();
formats.dedup();
Formats(formats)
@@ -66,21 +69,23 @@ impl Formats {
}
impl std::ops::Deref for Formats {
type Target = Vec<String>;
type Target = Vec<Arc<String>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// deprecated it should be removed in the future
/// Reserved for compatibility only
#[derive(Debug, Default)]
pub struct DateProcessor {
fields: Fields,
formats: Formats,
timezone: Option<String>,
locale: Option<String>, // to support locale
output_format: Option<String>,
timezone: Option<Arc<String>>,
locale: Option<Arc<String>>, // to support locale
output_format: Option<Arc<String>>,
ignore_missing: bool,
// description
@@ -91,11 +96,12 @@ pub struct DateProcessor {
}
impl DateProcessor {
fn with_fields(&mut self, fields: Fields) {
fn with_fields(&mut self, mut fields: Fields) {
update_one_one_output_keys(&mut fields);
self.fields = fields
}
fn with_formats(&mut self, v: Option<Vec<String>>) {
fn with_formats(&mut self, v: Option<Vec<Arc<String>>>) {
let v = match v {
Some(v) if !v.is_empty() => v,
_ => DEFAULT_FORMATS.clone(),
@@ -107,19 +113,19 @@ impl DateProcessor {
fn with_timezone(&mut self, timezone: String) {
if !timezone.is_empty() {
self.timezone = Some(timezone);
self.timezone = Some(Arc::new(timezone));
}
}
fn with_locale(&mut self, locale: String) {
if !locale.is_empty() {
self.locale = Some(locale);
self.locale = Some(Arc::new(locale));
}
}
fn with_output_format(&mut self, output_format: String) {
if !output_format.is_empty() {
self.output_format = Some(output_format);
self.output_format = Some(Arc::new(output_format));
}
}
@@ -127,7 +133,7 @@ impl DateProcessor {
self.ignore_missing = ignore_missing;
}
fn parse(&self, val: &str) -> Result<Time, String> {
fn parse(&self, val: &str) -> Result<Timestamp, String> {
let mut tz = Tz::UTC;
if let Some(timezone) = &self.timezone {
tz = timezone.parse::<Tz>().map_err(|e| e.to_string())?;
@@ -135,10 +141,7 @@ impl DateProcessor {
for fmt in self.formats.iter() {
if let Ok(ns) = try_parse(val, fmt, tz) {
let mut t = Time::new(val, ns);
t.with_format(fmt);
t.with_timezone(self.timezone.clone());
return Ok(t);
return Ok(Timestamp::Nanosecond(ns));
}
}
@@ -146,12 +149,9 @@ impl DateProcessor {
}
fn process_field(&self, val: &str, field: &Field) -> Result<Map, String> {
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let key = field.get_target_field();
Ok(Map::one(key, Value::Time(self.parse(val)?)))
Ok(Map::one(key, Value::Timestamp(self.parse(val)?)))
}
}
@@ -178,7 +178,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
FORMATS_NAME => {
let formats = yaml_strings(v, FORMATS_NAME)?;
formats_opt = Some(formats);
formats_opt = Some(formats.into_iter().map(Arc::new).collect());
}
TIMEZONE_NAME => {
processor.with_timezone(yaml_string(v, TIMEZONE_NAME)?);
@@ -217,6 +217,17 @@ impl Processor for DateProcessor {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.map(|f| f.get_target_field().to_string())
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(s) => self.process_field(s, field),
@@ -226,6 +237,42 @@ impl Processor for DateProcessor {
)),
}
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields().iter() {
let index = field.input_field.index;
match val.get(index) {
Some(Value::String(s)) => {
// TODO(qtang): Let this method use the intermediate state collection directly.
let mut map = self.process_field(s, field)?;
field
.output_fields_index_mapping
.iter()
.for_each(|(k, output_index)| {
if let Some(v) = map.remove(k) {
val[*output_index] = v;
}
});
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
}
}
}
Ok(())
}
}
/// try to parse val with timezone first, if failed, parse without timezone
@@ -244,6 +291,8 @@ fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64, String> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chrono_tz::Asia::Tokyo;
use crate::etl::processor::date::{try_parse, DateProcessor};
@@ -299,7 +348,7 @@ mod tests {
"%Y-%m-%dT%H:%M:%SZ",
]
.into_iter()
.map(|s| s.to_string())
.map(|s| Arc::new(s.to_string()))
.collect();
processor.with_formats(Some(formats));

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use common_telemetry::warn;
use itertools::Itertools;
@@ -543,6 +542,35 @@ impl DissectProcessor {
Err("No matching pattern found".to_string())
}
/// Update the output keys for each field.
fn update_output_keys(&mut self) {
// every pattern had been checked, so we can get all the output keys
let output_keys = self
.patterns
.iter()
.flat_map(|pattern| pattern.iter())
.filter_map(|p| match p {
Part::Name(name) => {
if !name.is_empty()
&& !name.start_modifier.as_ref().is_some_and(|x| {
*x == StartModifier::NamedSkip || *x == StartModifier::MapVal
})
{
Some(name)
} else {
None
}
}
_ => None,
})
.collect::<Vec<_>>();
for field in self.fields.iter_mut() {
for k in &output_keys {
field.output_fields_index_mapping.insert(k.to_string(), 0);
}
}
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessor {
@@ -576,7 +604,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessor {
_ => {}
}
}
processor.update_output_keys();
Ok(processor)
}
}
@@ -594,6 +622,24 @@ impl Processor for DissectProcessor {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
let mut result = HashSet::with_capacity(30);
for pattern in &self.patterns {
for part in pattern.iter() {
if let Part::Name(name) = part {
if !name.is_empty() {
result.insert(name.to_string());
}
}
}
}
result
}
fn exec_field(&self, val: &Value, _field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => match self.process(val) {
@@ -609,6 +655,46 @@ impl Processor for DissectProcessor {
)),
}
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
let index = field.input_field.index;
match val.get(index) {
// TODO(qtang): Let this method use the intermediate state collection directly.
Some(Value::String(val_str)) => match self.process(val_str) {
Ok(mut map) => {
field
.output_fields_index_mapping
.iter()
.for_each(|(k, output_index)| {
if let Some(v) = map.remove(k) {
val[*output_index] = v
}
});
}
Err(e) => {
warn!("dissect processor: {}", e);
}
},
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
}
}
}
Ok(())
}
}
fn is_valid_char(ch: char) -> bool {
@@ -617,7 +703,7 @@ fn is_valid_char(ch: char) -> bool {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use ahash::HashMap;
use super::{DissectProcessor, EndModifier, Name, Part, Pattern, StartModifier};
use crate::etl::value::{Map, Value};

View File

@@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::HashSet;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME,
update_one_one_output_keys, yaml_bool, yaml_field, yaml_fields, yaml_string, Processor,
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::time::{
MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
};
use crate::etl::value::{Epoch, Map, Value};
use crate::etl::value::{Map, Timestamp, Value};
pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
const RESOLUTION_NAME: &str = "resolution";
@@ -51,6 +53,8 @@ impl TryFrom<&str> for Resolution {
}
/// support string, integer, float, time, epoch
/// deprecated it should be removed in the future
/// Reserved for compatibility only
#[derive(Debug, Default)]
pub struct EpochProcessor {
fields: Fields,
@@ -64,7 +68,8 @@ pub struct EpochProcessor {
}
impl EpochProcessor {
fn with_fields(&mut self, fields: Fields) {
fn with_fields(&mut self, mut fields: Fields) {
update_one_one_output_keys(&mut fields);
self.fields = fields
}
@@ -76,9 +81,11 @@ impl EpochProcessor {
self.ignore_missing = ignore_missing;
}
fn parse(&self, val: &Value) -> Result<Epoch, String> {
fn parse(&self, val: &Value) -> Result<Timestamp, String> {
let t: i64 = match val {
Value::String(s) => s.parse::<i64>().map_err(|e| e.to_string())?,
Value::String(s) => s
.parse::<i64>()
.map_err(|e| format!("Failed to parse {} to number: {}", s, e))?,
Value::Int16(i) => *i as i64,
Value::Int32(i) => *i as i64,
Value::Int64(i) => *i,
@@ -89,20 +96,13 @@ impl EpochProcessor {
Value::Float32(f) => *f as i64,
Value::Float64(f) => *f as i64,
Value::Time(t) => match self.resolution {
Value::Timestamp(t) => match self.resolution {
Resolution::Second => t.timestamp(),
Resolution::Milli => t.timestamp_millis(),
Resolution::Micro => t.timestamp_micros(),
Resolution::Nano => t.timestamp_nanos(),
},
Value::Epoch(e) => match self.resolution {
Resolution::Second => e.timestamp(),
Resolution::Milli => e.timestamp_millis(),
Resolution::Micro => e.timestamp_micros(),
Resolution::Nano => e.timestamp_nanos(),
},
_ => {
return Err(format!(
"{PROCESSOR_EPOCH} processor: unsupported value {val}"
@@ -111,20 +111,17 @@ impl EpochProcessor {
};
match self.resolution {
Resolution::Second => Ok(Epoch::Second(t)),
Resolution::Milli => Ok(Epoch::Millisecond(t)),
Resolution::Micro => Ok(Epoch::Microsecond(t)),
Resolution::Nano => Ok(Epoch::Nanosecond(t)),
Resolution::Second => Ok(Timestamp::Second(t)),
Resolution::Milli => Ok(Timestamp::Millisecond(t)),
Resolution::Micro => Ok(Timestamp::Microsecond(t)),
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
}
}
fn process_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let key = field.get_target_field();
Ok(Map::one(key, Value::Epoch(self.parse(val)?)))
Ok(Map::one(key, Value::Timestamp(self.parse(val)?)))
}
}
@@ -175,9 +172,50 @@ impl Processor for EpochProcessor {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.map(|f| f.get_target_field().to_string())
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
self.process_field(val, field)
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
let index = field.input_field.index;
match val.get(index) {
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
// TODO(qtang): Let this method use the intermediate state collection directly.
let mut map = self.process_field(v, field)?;
field
.output_fields_index_mapping
.iter()
.for_each(|(k, output_index)| {
if let Some(v) = map.remove(k) {
val[*output_index] = v;
}
});
}
}
}
Ok(())
}
}
#[cfg(test)]
@@ -199,7 +237,7 @@ mod tests {
for value in values {
let parsed = processor.parse(&value).unwrap();
assert_eq!(parsed, super::Epoch::Second(1573840000));
assert_eq!(parsed, super::Timestamp::Second(1573840000));
}
}
}

View File

@@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::HashSet;
use regex::Regex;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, PATTERN_NAME,
update_one_one_output_keys, yaml_bool, yaml_field, yaml_fields, yaml_string, Processor,
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
};
use crate::etl::value::{Array, Map, Value};
@@ -35,7 +36,8 @@ pub struct GsubProcessor {
}
impl GsubProcessor {
fn with_fields(&mut self, fields: Fields) {
fn with_fields(&mut self, mut fields: Fields) {
update_one_one_output_keys(&mut fields);
self.fields = fields;
}
@@ -74,19 +76,13 @@ impl GsubProcessor {
.to_string();
let val = Value::String(new_val);
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let key = field.get_target_field();
Ok(Map::one(key, val))
}
fn process_array_field(&self, arr: &Array, field: &Field) -> Result<Map, String> {
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let key = field.get_target_field();
let re = self.pattern.as_ref().unwrap();
let replacement = self.replacement.as_ref().unwrap();
@@ -160,6 +156,17 @@ impl crate::etl::processor::Processor for GsubProcessor {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.map(|f| f.get_target_field().to_string())
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_string_field(val, field),
@@ -170,6 +177,36 @@ impl crate::etl::processor::Processor for GsubProcessor {
)),
}
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
let index = field.input_field.index;
match val.get(index) {
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
// TODO(qtang): Let this method use the intermediate state collection directly.
let mut map = self.exec_field(v, field)?;
field
.output_fields_index_mapping
.iter()
.for_each(|(k, output_index)| {
if let Some(v) = map.remove(k) {
val[*output_index] = v;
}
});
}
}
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::HashSet;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, SEPARATOR_NAME,
update_one_one_output_keys, yaml_bool, yaml_field, yaml_fields, yaml_string, Processor,
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, SEPARATOR_NAME,
};
use crate::etl::value::{Array, Map, Value};
@@ -30,7 +32,8 @@ pub struct JoinProcessor {
}
impl JoinProcessor {
fn with_fields(&mut self, fields: Fields) {
fn with_fields(&mut self, mut fields: Fields) {
update_one_one_output_keys(&mut fields);
self.fields = fields;
}
@@ -43,10 +46,7 @@ impl JoinProcessor {
}
fn process_field(&self, arr: &Array, field: &Field) -> Result<Map, String> {
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let key = field.get_target_field();
let sep = self.separator.as_ref().unwrap();
let val = arr
@@ -111,6 +111,17 @@ impl Processor for JoinProcessor {
self.ignore_missing
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.map(|f| f.get_target_field().to_string())
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::Array(arr) => self.process_field(arr, field),
@@ -120,6 +131,43 @@ impl Processor for JoinProcessor {
)),
}
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
let index = field.input_field.index;
match val.get(index) {
Some(Value::Array(arr)) => {
// TODO(qtang): Let this method use the intermediate state collection directly.
let mut map = self.process_field(arr, field)?;
field
.output_fields_index_mapping
.iter()
.for_each(|(k, output_index)| {
if let Some(v) = map.remove(k) {
val[*output_index] = v;
}
});
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
}
}
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::HashSet;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, METHOD_NAME,
update_one_one_output_keys, yaml_bool, yaml_field, yaml_fields, yaml_string, Processor,
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, METHOD_NAME,
};
use crate::etl::value::{Map, Value};
@@ -61,7 +63,8 @@ pub struct LetterProcessor {
}
impl LetterProcessor {
fn with_fields(&mut self, fields: Fields) {
fn with_fields(&mut self, mut fields: Fields) {
update_one_one_output_keys(&mut fields);
self.fields = fields;
}
@@ -81,10 +84,7 @@ impl LetterProcessor {
};
let val = Value::String(processed);
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let key = field.get_target_field();
Ok(Map::one(key, val))
}
@@ -135,6 +135,17 @@ impl Processor for LetterProcessor {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.map(|f| f.get_target_field().to_string())
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_field(val, field),
@@ -144,6 +155,43 @@ impl Processor for LetterProcessor {
)),
}
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
let index = field.input_field.index;
match val.get(index) {
Some(Value::String(s)) => {
// TODO(qtang): Let this method use the intermediate state collection directly.
let mut processed = self.process_field(s, field)?;
field
.output_fields_index_mapping
.iter()
.for_each(|(k, output_index)| {
if let Some(v) = processed.remove(k) {
val[*output_index] = v;
}
});
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
}
}
}
Ok(())
}
}
fn capitalize(s: &str) -> String {

View File

@@ -18,6 +18,7 @@ const PATTERNS_NAME: &str = "patterns";
pub(crate) const PROCESSOR_REGEX: &str = "regex";
use ahash::HashSet;
use lazy_static::lazy_static;
use regex::Regex;
@@ -117,6 +118,10 @@ impl RegexProcessor {
Ok(self)
}
fn generate_key(prefix: &str, group: &str) -> String {
format!("{prefix}_{group}")
}
fn process_field(&self, val: &str, field: &Field, gr: &GroupRegex) -> Result<Map, String> {
let mut map = Map::default();
@@ -124,12 +129,9 @@ impl RegexProcessor {
for group in &gr.groups {
if let Some(capture) = captures.name(group) {
let value = capture.as_str().to_string();
let prefix = match &field.target_field {
Some(s) => s,
None => &field.field,
};
let prefix = field.get_target_field();
let key = format!("{prefix}_{group}");
let key = Self::generate_key(prefix, group);
map.insert(key, Value::String(value));
}
@@ -138,6 +140,18 @@ impl RegexProcessor {
Ok(map)
}
fn update_output_keys(&mut self) {
for field in self.fields.iter_mut() {
for gr in &self.patterns {
for group in &gr.groups {
field
.output_fields_index_mapping
.insert(Self::generate_key(field.get_target_field(), group), 0_usize);
}
}
}
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
@@ -170,7 +184,10 @@ impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
}
}
processor.check()
processor.check().map(|mut p| {
p.update_output_keys();
p
})
}
}
@@ -187,6 +204,23 @@ impl Processor for RegexProcessor {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.flat_map(|f| {
self.patterns.iter().flat_map(move |p| {
p.groups
.iter()
.map(move |g| Self::generate_key(&f.input_field.name, g))
})
})
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => {
@@ -203,6 +237,48 @@ impl Processor for RegexProcessor {
)),
}
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
let index = field.input_field.index;
match val.get(index) {
Some(Value::String(s)) => {
let mut map = Map::default();
for gr in &self.patterns {
// TODO(qtang): Let this method use the intermediate state collection directly.
let m = self.process_field(s, field, gr)?;
map.extend(m);
}
field
.output_fields_index_mapping
.iter()
.for_each(|(k, output_index)| {
if let Some(v) = map.remove(k) {
val[*output_index] = v;
}
});
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
@@ -228,18 +304,18 @@ mod tests {
let mut map = Map::default();
map.insert("a", Value::String("123".to_string()));
let processed_val = processor.exec_map(map).unwrap();
processor.exec_map(&mut map).unwrap();
let v = Value::Map(Map {
let v = Map {
values: vec![
("a_ar".to_string(), Value::String("1".to_string())),
("a".to_string(), Value::String("123".to_string())),
]
.into_iter()
.collect(),
});
};
assert_eq!(v, processed_val);
assert_eq!(v, map);
}
#[test]
@@ -264,7 +340,7 @@ mod tests {
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect();
let temporary_map = Map { values };
let mut temporary_map = Map { values };
{
// single field (with prefix), multiple patterns
@@ -287,9 +363,9 @@ mod tests {
let mut map = Map::default();
map.insert("breadcrumbs", breadcrumbs.clone());
let processed_val = processor.exec_map(map).unwrap();
processor.exec_map(&mut map).unwrap();
assert_eq!(processed_val, Value::Map(temporary_map.clone()));
assert_eq!(map, temporary_map);
}
{
@@ -337,11 +413,11 @@ mod tests {
.map(|(k, v)| (k.to_string(), v))
.collect();
let actual_val = processor.exec_map(temporary_map.clone()).unwrap();
let mut expected_map = temporary_map.clone();
processor.exec_map(&mut temporary_map).unwrap();
expected_map.extend(Map { values: new_values });
assert_eq!(Value::Map(expected_map), actual_val);
assert_eq!(expected_map, temporary_map);
}
}
}

View File

@@ -0,0 +1,440 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use ahash::HashSet;
use chrono::{DateTime, NaiveDateTime};
use chrono_tz::Tz;
use lazy_static::lazy_static;
use super::yaml_strings;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
update_one_one_output_keys, yaml_bool, yaml_field, yaml_fields, yaml_string, Processor,
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::time::{
MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
};
use crate::etl::value::{Map, Timestamp, Value};
pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp";
const RESOLUTION_NAME: &str = "resolution";
const FORMATS_NAME: &str = "formats"; // default RFC3339
lazy_static! {
static ref DEFAULT_FORMATS: Vec<(Arc<String>,Tz)> = vec![
// timezone with colon
"%Y-%m-%dT%H:%M:%S%:z",
"%Y-%m-%dT%H:%M:%S%.3f%:z",
"%Y-%m-%dT%H:%M:%S%.6f%:z",
"%Y-%m-%dT%H:%M:%S%.9f%:z",
// timezone without colon
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S%.3f%z",
"%Y-%m-%dT%H:%M:%S%.6f%z",
"%Y-%m-%dT%H:%M:%S%.9f%z",
// without timezone
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S%.3f",
"%Y-%m-%dT%H:%M:%S%.6f",
"%Y-%m-%dT%H:%M:%S%.9f",
]
.iter()
.map(|s| (Arc::new(s.to_string()),Tz::UCT))
.collect();
}
#[derive(Debug, Default)]
enum Resolution {
Second,
#[default]
Milli,
Micro,
Nano,
}
impl TryFrom<&str> for Resolution {
type Error = String;
fn try_from(s: &str) -> Result<Self, Self::Error> {
match s {
SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
_ => Err(format!("invalid resolution: {s}")),
}
}
}
#[derive(Debug)]
struct Formats(Vec<(Arc<String>, Tz)>);
impl Formats {
fn new(mut formats: Vec<(Arc<String>, Tz)>) -> Self {
formats.sort_by_key(|(key, _)| key.clone());
formats.dedup();
Formats(formats)
}
}
impl Default for Formats {
fn default() -> Self {
Formats(DEFAULT_FORMATS.clone())
}
}
impl std::ops::Deref for Formats {
type Target = Vec<(Arc<String>, Tz)>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// support string, integer, float, time, epoch
#[derive(Debug, Default)]
pub struct TimestampProcessor {
fields: Fields,
formats: Formats,
resolution: Resolution,
ignore_missing: bool,
// description
// if
// ignore_failure
// on_failure
// tag
}
impl TimestampProcessor {
fn with_fields(&mut self, mut fields: Fields) {
update_one_one_output_keys(&mut fields);
self.fields = fields
}
fn with_resolution(&mut self, resolution: Resolution) {
self.resolution = resolution;
}
fn with_formats(&mut self, v: Option<Vec<(Arc<String>, Tz)>>) {
let v = match v {
Some(v) if !v.is_empty() => v,
_ => DEFAULT_FORMATS.clone(),
};
let formats = Formats::new(v);
self.formats = formats;
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
/// try to parse val with timezone first, if failed, parse without timezone
fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64, String> {
if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?)
} else {
let dt = NaiveDateTime::parse_from_str(val, fmt)
.map_err(|e| e.to_string())?
.and_local_timezone(tz)
.single()
.ok_or("failed to get local timezone")?;
Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?)
}
}
fn parse_time_str(&self, val: &str) -> Result<i64, String> {
for (fmt, tz) in self.formats.iter() {
if let Ok(ns) = Self::try_parse(val, fmt, *tz) {
return Ok(ns);
}
}
Err(format!("{} processor: failed to parse {val}", self.kind(),))
}
fn parse(&self, val: &Value) -> Result<Timestamp, String> {
let t: i64 = match val {
Value::String(s) => {
let t = s.parse::<i64>();
match t {
Ok(t) => t,
Err(_) => {
let ns = self.parse_time_str(s)?;
return Ok(Timestamp::Nanosecond(ns));
}
}
}
Value::Int16(i) => *i as i64,
Value::Int32(i) => *i as i64,
Value::Int64(i) => *i,
Value::Uint8(i) => *i as i64,
Value::Uint16(i) => *i as i64,
Value::Uint32(i) => *i as i64,
Value::Uint64(i) => *i as i64,
Value::Float32(f) => *f as i64,
Value::Float64(f) => *f as i64,
Value::Timestamp(e) => match self.resolution {
Resolution::Second => e.timestamp(),
Resolution::Milli => e.timestamp_millis(),
Resolution::Micro => e.timestamp_micros(),
Resolution::Nano => e.timestamp_nanos(),
},
_ => {
return Err(format!(
"{PROCESSOR_TIMESTAMP} processor: unsupported value {val}"
))
}
};
match self.resolution {
Resolution::Second => Ok(Timestamp::Second(t)),
Resolution::Milli => Ok(Timestamp::Millisecond(t)),
Resolution::Micro => Ok(Timestamp::Microsecond(t)),
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
}
}
fn process_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
let key = field.get_target_field();
Ok(Map::one(key, Value::Timestamp(self.parse(val)?)))
}
}
fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>, String> {
return match yaml.as_vec() {
Some(formats_yaml) => {
let mut formats = Vec::with_capacity(formats_yaml.len());
for v in formats_yaml {
let s = yaml_strings(v, FORMATS_NAME)
.or(yaml_string(v, FORMATS_NAME).map(|s| vec![s]))?;
if s.len() != 1 && s.len() != 2 {
return Err(format!(
"{PROCESSOR_TIMESTAMP} processor: invalid format {s:?}"
));
}
let mut iter = s.into_iter();
// safety: unwrap is safe here
let formatter = iter.next().unwrap();
let tz = iter
.next()
.map(|tz| tz.parse::<Tz>())
.unwrap_or(Ok(Tz::UTC))
.map_err(|e| e.to_string())?;
formats.push((Arc::new(formatter), tz));
}
Ok(formats)
}
None => Err(format!(
"{PROCESSOR_TIMESTAMP} processor: invalid format {yaml:?}"
)),
};
}
impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessor {
type Error = String;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = TimestampProcessor::default();
for (k, v) in hash {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
FORMATS_NAME => {
let formats = parse_formats(v)?;
processor.with_formats(Some(formats));
}
RESOLUTION_NAME => {
let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
processor.with_resolution(s);
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
_ => {}
}
}
Ok(processor)
}
}
impl Processor for TimestampProcessor {
fn kind(&self) -> &str {
PROCESSOR_TIMESTAMP
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn fields(&self) -> &Fields {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.map(|f| f.get_target_field().to_string())
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
self.process_field(val, field)
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
let index = field.input_field.index;
match val.get(index) {
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
// TODO(qtang): Let this method use the intermediate state collection directly.
let mut map = self.process_field(v, field)?;
field
.output_fields_index_mapping
.iter()
.for_each(|(k, output_index)| {
if let Some(v) = map.remove(k) {
val[*output_index] = v;
}
});
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use yaml_rust::YamlLoader;
use super::TimestampProcessor;
use crate::etl::value::{Timestamp, Value};
#[test]
fn test_parse_epoch() {
let processor_yaml_str = r#"fields:
- hello
resolution: s
formats:
- "%Y-%m-%dT%H:%M:%S%:z"
- "%Y-%m-%dT%H:%M:%S%.3f%:z"
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%SZ"
"#;
let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
let timestamp_yaml = yaml.as_hash().unwrap();
let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
let values = [
(
Value::String("1573840000".into()),
Timestamp::Second(1573840000),
),
(Value::Int32(1573840001), Timestamp::Second(1573840001)),
(Value::Uint64(1573840002), Timestamp::Second(1573840002)),
// float32 has a problem expressing the timestamp.
// 1573840003.0_f32 as i64 is 1573840000
//(Value::Float32(1573840003.0), Epoch::Second(1573840003)),
(
Value::String("2019-11-15T17:46:40Z".into()),
Timestamp::Nanosecond(1573840000000000000),
),
];
for (value, result) in values {
let parsed = processor.parse(&value).unwrap();
assert_eq!(parsed, result);
}
let values: Vec<&str> = vec![
"2014-5-17T12:34:56",
"2014-5-17T12:34:56Z",
"2014-5-17T12:34:56+09:30",
"2014-5-17T12:34:56.000+09:30",
"2014-5-17T12:34:56-0930",
"2014-5-17T12:34:56.000-0930",
]
.into_iter()
.collect();
for value in values {
let parsed = processor.parse(&Value::String(value.into()));
assert!(parsed.is_ok());
}
}
#[test]
fn test_parse_with_timezone() {
let processor_yaml_str = r#"fields:
- hello
resolution: s
formats:
- ["%Y-%m-%dT%H:%M:%S%:z", "Asia/Tokyo"]
- ["%Y-%m-%dT%H:%M:%S%.3f%:z", "Asia/Tokyo"]
- ["%Y-%m-%dT%H:%M:%S", "Asia/Tokyo"]
- ["%Y-%m-%dT%H:%M:%SZ", "Asia/Tokyo"]
"#;
let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
let timestamp_yaml = yaml.as_hash().unwrap();
let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
let values: Vec<&str> = vec![
"2014-5-17T12:34:56",
"2014-5-17T12:34:56Z",
"2014-5-17T12:34:56+09:30",
"2014-5-17T12:34:56.000+09:30",
"2014-5-17T12:34:56-0930",
"2014-5-17T12:34:56.000-0930",
]
.into_iter()
.collect();
for value in values {
let parsed = processor.parse(&Value::String(value.into()));
assert!(parsed.is_ok());
}
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::HashSet;
use urlencoding::{decode, encode};
use crate::etl::field::{Field, Fields};
@@ -60,7 +61,8 @@ pub struct UrlEncodingProcessor {
}
impl UrlEncodingProcessor {
fn with_fields(&mut self, fields: Fields) {
fn with_fields(&mut self, mut fields: Fields) {
Self::update_output_keys(&mut fields);
self.fields = fields;
}
@@ -79,13 +81,18 @@ impl UrlEncodingProcessor {
};
let val = Value::String(processed);
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
let key = field.get_target_field();
Ok(Map::one(key, val))
}
fn update_output_keys(fields: &mut Fields) {
for field in fields.iter_mut() {
field
.output_fields_index_mapping
.insert(field.get_target_field().to_string(), 0_usize);
}
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessor {
@@ -136,6 +143,17 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
&self.fields
}
fn fields_mut(&mut self) -> &mut Fields {
&mut self.fields
}
fn output_keys(&self) -> HashSet<String> {
self.fields
.iter()
.map(|f| f.get_target_field().to_string())
.collect()
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_field(val, field),
@@ -145,6 +163,41 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
)),
}
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
for field in self.fields.iter() {
let index = field.input_field.index;
match val.get(index) {
Some(Value::String(s)) => {
let mut map = self.process_field(s, field)?;
field
.output_fields_index_mapping
.iter()
.for_each(|(k, output_index)| {
if let Some(v) = map.remove(k) {
val[*output_index] = v;
}
});
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.get_field_name()
));
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
}
}
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -18,7 +18,7 @@ pub mod transformer;
use itertools::Itertools;
use crate::etl::field::Fields;
use crate::etl::processor::{yaml_field, yaml_fields, yaml_string};
use crate::etl::processor::{update_one_one_output_keys, yaml_field, yaml_fields, yaml_string};
use crate::etl::transform::index::Index;
use crate::etl::value::Value;
@@ -30,13 +30,17 @@ const TRANSFORM_DEFAULT: &str = "default";
const TRANSFORM_ON_FAILURE: &str = "on_failure";
pub use transformer::greptime::GreptimeTransformer;
// pub use transformer::noop::NoopTransformer;
pub trait Transformer: std::fmt::Display + Sized + Send + Sync + 'static {
type Output;
type VecOutput;
fn new(transforms: Transforms) -> Result<Self, String>;
fn transform(&self, val: crate::etl::value::Value) -> Result<Self::Output, String>;
fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema>;
fn transforms(&self) -> &Transforms;
fn transforms_mut(&mut self) -> &mut Transforms;
fn transform(&self, val: Value) -> Result<Self::Output, String>;
fn transform_mut(&self, val: &mut Vec<Value>) -> Result<Self::VecOutput, String>;
}
/// On Failure behavior when transform fails
@@ -74,6 +78,30 @@ impl std::fmt::Display for OnFailure {
#[derive(Debug, Default, Clone)]
pub struct Transforms {
transforms: Vec<Transform>,
output_keys: Vec<String>,
required_keys: Vec<String>,
}
impl Transforms {
pub fn output_keys(&self) -> &Vec<String> {
&self.output_keys
}
pub fn output_keys_mut(&mut self) -> &mut Vec<String> {
&mut self.output_keys
}
pub fn required_keys_mut(&mut self) -> &mut Vec<String> {
&mut self.required_keys
}
pub fn required_keys(&self) -> &Vec<String> {
&self.required_keys
}
pub fn transforms(&self) -> &Vec<Transform> {
&self.transforms
}
}
impl std::fmt::Display for Transforms {
@@ -106,17 +134,38 @@ impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
type Error = String;
fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self, Self::Error> {
let mut transforms = vec![];
let mut transforms = Vec::with_capacity(100);
let mut all_output_keys: Vec<String> = Vec::with_capacity(100);
let mut all_required_keys = Vec::with_capacity(100);
for doc in docs {
let transform: Transform = doc
.as_hash()
.ok_or("transform element must be a map".to_string())?
.try_into()?;
let mut transform_output_keys = transform
.fields
.iter()
.map(|f| f.get_target_field().to_string())
.collect();
all_output_keys.append(&mut transform_output_keys);
let mut transform_required_keys = transform
.fields
.iter()
.map(|f| f.input_field.name.clone())
.collect();
all_required_keys.append(&mut transform_required_keys);
transforms.push(transform);
}
Ok(Transforms { transforms })
all_required_keys.sort();
Ok(Transforms {
transforms,
output_keys: all_output_keys,
required_keys: all_required_keys,
})
}
}
@@ -173,7 +222,8 @@ impl Default for Transform {
}
impl Transform {
fn with_fields(&mut self, fields: Fields) {
fn with_fields(&mut self, mut fields: Fields) {
update_one_one_output_keys(&mut fields);
self.fields = fields;
}

View File

@@ -13,12 +13,14 @@
// limitations under the License.
const INDEX_TIMESTAMP: &str = "timestamp";
const INDEX_TIMEINDEX: &str = "time";
const INDEX_TAG: &str = "tag";
const INDEX_FULLTEXT: &str = "fulltext";
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[allow(clippy::enum_variant_names)]
pub enum Index {
Timestamp,
Time,
Tag,
Fulltext,
}
@@ -26,7 +28,7 @@ pub enum Index {
impl std::fmt::Display for Index {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let index = match self {
Index::Timestamp => INDEX_TIMESTAMP,
Index::Time => INDEX_TIMEINDEX,
Index::Tag => INDEX_TAG,
Index::Fulltext => INDEX_FULLTEXT,
};
@@ -48,7 +50,7 @@ impl TryFrom<&str> for Index {
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
INDEX_TIMESTAMP => Ok(Index::Timestamp),
INDEX_TIMESTAMP | INDEX_TIMEINDEX => Ok(Index::Time),
INDEX_TAG => Ok(Index::Tag),
INDEX_FULLTEXT => Ok(Index::Fulltext),
_ => Err(format!("unsupported index type: {}", value)),

View File

@@ -13,4 +13,3 @@
// limitations under the License.
pub mod greptime;
pub mod noop;

View File

@@ -23,7 +23,7 @@ use itertools::Itertools;
use crate::etl::field::{Field, Fields};
use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transformer, Transforms};
use crate::etl::value::{Array, Epoch, Map, Value};
use crate::etl::value::{Array, Map, Timestamp, Value};
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
@@ -32,46 +32,62 @@ const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
#[derive(Debug, Clone)]
pub struct GreptimeTransformer {
transforms: Transforms,
schema: Vec<ColumnSchema>,
}
impl GreptimeTransformer {
fn default_greptime_timestamp_column() -> Transform {
let ns = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let type_ = Value::Epoch(Epoch::Nanosecond(ns));
let type_ = Value::Timestamp(Timestamp::Nanosecond(ns));
let default = Some(type_.clone());
let field = Field::new(DEFAULT_GREPTIME_TIMESTAMP_COLUMN);
let mut field = Field::new(DEFAULT_GREPTIME_TIMESTAMP_COLUMN);
field.insert_output_index(DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(), 0);
let fields = Fields::new(vec![field]).unwrap();
Transform {
fields,
type_,
default,
index: Some(Index::Timestamp),
on_failure: None,
index: Some(Index::Time),
on_failure: Some(crate::etl::transform::OnFailure::Default),
}
}
fn schemas(&self) -> Result<Vec<ColumnSchema>, String> {
fn schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>, String> {
let mut schema = vec![];
for transform in self.transforms.iter() {
for transform in transforms.iter() {
schema.extend(coerce_columns(transform)?);
}
Ok(schema)
}
fn transform_map(&self, map: &Map) -> Result<Row, String> {
let mut values = vec![];
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
for transform in self.transforms.iter() {
for field in transform.fields.iter() {
let value_data = match map.get(field.get_field()) {
let value_data = match map.get(field.get_field_name()) {
Some(val) => coerce_value(val, transform)?,
None if transform.get_default().is_some() => {
coerce_value(transform.get_default().unwrap(), transform)?
None => {
let default = transform.get_default();
match default {
Some(default) => coerce_value(default, transform)?,
None => None,
}
}
None => None,
};
values.push(GreptimeValue { value_data });
if let Some(i) = field
.output_fields_index_mapping
.iter()
.next()
.map(|kv| kv.1)
{
values[*i] = GreptimeValue { value_data }
} else {
return Err(format!(
"field: {} output_fields is empty.",
field.get_field_name()
));
}
}
}
@@ -79,7 +95,7 @@ impl GreptimeTransformer {
}
fn transform_array(&self, arr: &Array) -> Result<Vec<Row>, String> {
let mut rows = vec![];
let mut rows = Vec::with_capacity(arr.len());
for v in arr.iter() {
match v {
Value::Map(map) => {
@@ -101,6 +117,7 @@ impl std::fmt::Display for GreptimeTransformer {
impl Transformer for GreptimeTransformer {
type Output = Rows;
type VecOutput = Row;
fn new(mut transforms: Transforms) -> Result<Self, String> {
if transforms.is_empty() {
@@ -128,9 +145,9 @@ impl Transformer for GreptimeTransformer {
column_names_set.extend(target_fields_set);
if let Some(idx) = transform.index {
if idx == Index::Timestamp {
if idx == Index::Time {
match transform.fields.len() {
1 => timestamp_columns.push(transform.fields.first().unwrap().get_field()),
1 => timestamp_columns.push(transform.fields.first().unwrap().get_field_name()),
_ => return Err(format!(
"Illegal to set multiple timestamp Index columns, please set only one: {}",
transform.fields.get_target_fields().join(", ")
@@ -143,9 +160,20 @@ impl Transformer for GreptimeTransformer {
match timestamp_columns.len() {
0 => {
transforms.push(GreptimeTransformer::default_greptime_timestamp_column());
Ok(GreptimeTransformer { transforms })
let required_keys = transforms.required_keys_mut();
required_keys.push(DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string());
let output_keys = transforms.output_keys_mut();
output_keys.push(DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string());
let schema = GreptimeTransformer::schemas(&transforms)?;
Ok(GreptimeTransformer { transforms, schema })
}
1 => {
let schema = GreptimeTransformer::schemas(&transforms)?;
Ok(GreptimeTransformer { transforms, schema })
}
1 => Ok(GreptimeTransformer { transforms }),
_ => {
let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", ");
let count = timestamp_columns.len();
@@ -157,17 +185,69 @@ impl Transformer for GreptimeTransformer {
}
fn transform(&self, value: Value) -> Result<Self::Output, String> {
let schema = self.schemas()?;
match value {
Value::Map(map) => {
let rows = vec![self.transform_map(&map)?];
Ok(Rows { schema, rows })
Ok(Rows {
schema: self.schema.clone(),
rows,
})
}
Value::Array(arr) => {
let rows = self.transform_array(&arr)?;
Ok(Rows { schema, rows })
Ok(Rows {
schema: self.schema.clone(),
rows,
})
}
_ => Err(format!("Expected map or array, found: {}", value)),
}
}
fn transform_mut(&self, val: &mut Vec<Value>) -> Result<Self::VecOutput, String> {
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
for transform in self.transforms.iter() {
for field in transform.fields.iter() {
let index = field.input_field.index;
match val.get(index) {
Some(v) => {
let value_data = coerce_value(v, transform)
.map_err(|e| format!("{} processor: {}", field.get_field_name(), e))?;
// every transform fields has only one output field
if let Some(i) = field
.output_fields_index_mapping
.iter()
.next()
.map(|kv| kv.1)
{
values[*i] = GreptimeValue { value_data }
} else {
return Err(format!(
"field: {} output_fields is empty.",
field.get_field_name()
));
}
}
_ => {
return Err(format!(
"Get field not in the array field: {field:?}, {val:?}"
))
}
}
}
}
Ok(Row { values })
}
fn transforms(&self) -> &Transforms {
&self.transforms
}
fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
&self.schema
}
fn transforms_mut(&mut self) -> &mut Transforms {
&mut self.transforms
}
}

View File

@@ -20,7 +20,7 @@ use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use crate::etl::transform::index::Index;
use crate::etl::transform::{OnFailure, Transform};
use crate::etl::value::{Epoch, Time, Value};
use crate::etl::value::{Timestamp, Value};
impl TryFrom<Value> for ValueData {
type Error = String;
@@ -43,14 +43,18 @@ impl TryFrom<Value> for ValueData {
Value::Float64(v) => Ok(ValueData::F64Value(v)),
Value::Boolean(v) => Ok(ValueData::BoolValue(v)),
Value::String(v) => Ok(ValueData::StringValue(v.clone())),
Value::String(v) => Ok(ValueData::StringValue(v)),
Value::Time(Time { nanosecond, .. }) => Ok(ValueData::TimeNanosecondValue(nanosecond)),
Value::Epoch(Epoch::Nanosecond(ns)) => Ok(ValueData::TimestampNanosecondValue(ns)),
Value::Epoch(Epoch::Microsecond(us)) => Ok(ValueData::TimestampMicrosecondValue(us)),
Value::Epoch(Epoch::Millisecond(ms)) => Ok(ValueData::TimestampMillisecondValue(ms)),
Value::Epoch(Epoch::Second(s)) => Ok(ValueData::TimestampSecondValue(s)),
Value::Timestamp(Timestamp::Nanosecond(ns)) => {
Ok(ValueData::TimestampNanosecondValue(ns))
}
Value::Timestamp(Timestamp::Microsecond(us)) => {
Ok(ValueData::TimestampMicrosecondValue(us))
}
Value::Timestamp(Timestamp::Millisecond(ms)) => {
Ok(ValueData::TimestampMillisecondValue(ms))
}
Value::Timestamp(Timestamp::Second(s)) => Ok(ValueData::TimestampSecondValue(s)),
Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
@@ -85,7 +89,7 @@ pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>,
fn coerce_semantic_type(transform: &Transform) -> SemanticType {
match transform.index {
Some(Index::Tag) => SemanticType::Tag,
Some(Index::Timestamp) => SemanticType::Timestamp,
Some(Index::Time) => SemanticType::Timestamp,
Some(Index::Fulltext) | None => SemanticType::Field,
}
}
@@ -120,12 +124,10 @@ fn coerce_type(transform: &Transform) -> Result<ColumnDataType, String> {
Value::Boolean(_) => Ok(ColumnDataType::Boolean),
Value::String(_) => Ok(ColumnDataType::String),
Value::Time(_) => Ok(ColumnDataType::TimestampNanosecond),
Value::Epoch(Epoch::Nanosecond(_)) => Ok(ColumnDataType::TimestampNanosecond),
Value::Epoch(Epoch::Microsecond(_)) => Ok(ColumnDataType::TimestampMicrosecond),
Value::Epoch(Epoch::Millisecond(_)) => Ok(ColumnDataType::TimestampMillisecond),
Value::Epoch(Epoch::Second(_)) => Ok(ColumnDataType::TimestampSecond),
Value::Timestamp(Timestamp::Nanosecond(_)) => Ok(ColumnDataType::TimestampNanosecond),
Value::Timestamp(Timestamp::Microsecond(_)) => Ok(ColumnDataType::TimestampMicrosecond),
Value::Timestamp(Timestamp::Millisecond(_)) => Ok(ColumnDataType::TimestampMillisecond),
Value::Timestamp(Timestamp::Second(_)) => Ok(ColumnDataType::TimestampSecond),
Value::Array(_) => unimplemented!("Array"),
Value::Map(_) => unimplemented!("Object"),
@@ -142,7 +144,16 @@ pub(crate) fn coerce_value(
transform: &Transform,
) -> Result<Option<ValueData>, String> {
match val {
Value::Null => Ok(None),
Value::Null => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => transform
.get_default()
.map(|default| coerce_value(default, transform))
.unwrap_or_else(|| {
coerce_value(transform.get_type_matched_default_val(), transform)
}),
None => Ok(None),
},
Value::Int8(n) => coerce_i64_value(*n as i64, transform),
Value::Int16(n) => coerce_i64_value(*n as i64, transform),
@@ -160,14 +171,16 @@ pub(crate) fn coerce_value(
Value::Boolean(b) => coerce_bool_value(*b, transform),
Value::String(s) => coerce_string_value(s, transform),
Value::Time(Time { nanosecond, .. }) => {
Ok(Some(ValueData::TimestampNanosecondValue(*nanosecond)))
Value::Timestamp(Timestamp::Nanosecond(ns)) => {
Ok(Some(ValueData::TimestampNanosecondValue(*ns)))
}
Value::Epoch(Epoch::Nanosecond(ns)) => Ok(Some(ValueData::TimestampNanosecondValue(*ns))),
Value::Epoch(Epoch::Microsecond(us)) => Ok(Some(ValueData::TimestampMicrosecondValue(*us))),
Value::Epoch(Epoch::Millisecond(ms)) => Ok(Some(ValueData::TimestampMillisecondValue(*ms))),
Value::Epoch(Epoch::Second(s)) => Ok(Some(ValueData::TimestampSecondValue(*s))),
Value::Timestamp(Timestamp::Microsecond(us)) => {
Ok(Some(ValueData::TimestampMicrosecondValue(*us)))
}
Value::Timestamp(Timestamp::Millisecond(ms)) => {
Ok(Some(ValueData::TimestampMillisecondValue(*ms)))
}
Value::Timestamp(Timestamp::Second(s)) => Ok(Some(ValueData::TimestampSecondValue(*s))),
Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
@@ -192,14 +205,7 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>
Value::Boolean(_) => ValueData::BoolValue(b),
Value::String(_) => ValueData::StringValue(b.to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Boolean type not supported for Time".to_string()),
},
Value::Epoch(_) => match transform.on_failure {
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
@@ -234,15 +240,7 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>,
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Integer type not supported for Time".to_string()),
},
Value::Epoch(_) => match transform.on_failure {
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
@@ -277,15 +275,7 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>,
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Integer type not supported for Time".to_string()),
},
Value::Epoch(_) => match transform.on_failure {
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
@@ -320,15 +310,7 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>,
Value::Boolean(_) => ValueData::BoolValue(n != 0.0),
Value::String(_) => ValueData::StringValue(n.to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Float type not supported for Time".to_string()),
},
Value::Epoch(_) => match transform.on_failure {
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
@@ -345,77 +327,68 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>,
Ok(Some(val))
}
fn coerce_string_value(s: &str, transform: &Transform) -> Result<Option<ValueData>, String> {
match transform.type_ {
Value::Int8(_) if s.parse::<i32>().is_ok() => {
Ok(Some(ValueData::I8Value(s.parse().unwrap())))
}
Value::Int16(_) if s.parse::<i32>().is_ok() => {
Ok(Some(ValueData::I16Value(s.parse().unwrap())))
}
Value::Int32(_) if s.parse::<i32>().is_ok() => {
Ok(Some(ValueData::I32Value(s.parse().unwrap())))
}
Value::Int64(_) if s.parse::<i64>().is_ok() => {
Ok(Some(ValueData::I64Value(s.parse().unwrap())))
}
Value::Uint8(_) if s.parse::<u32>().is_ok() => {
Ok(Some(ValueData::U8Value(s.parse().unwrap())))
}
Value::Uint16(_) if s.parse::<u32>().is_ok() => {
Ok(Some(ValueData::U16Value(s.parse().unwrap())))
}
Value::Uint32(_) if s.parse::<u32>().is_ok() => {
Ok(Some(ValueData::U32Value(s.parse().unwrap())))
}
Value::Uint64(_) if s.parse::<u64>().is_ok() => {
Ok(Some(ValueData::U64Value(s.parse().unwrap())))
}
Value::Float32(_) if s.parse::<f32>().is_ok() => {
Ok(Some(ValueData::F32Value(s.parse().unwrap())))
}
Value::Float64(_) if s.parse::<f64>().is_ok() => {
Ok(Some(ValueData::F64Value(s.parse().unwrap())))
}
Value::Boolean(_) if s.parse::<bool>().is_ok() => {
Ok(Some(ValueData::BoolValue(s.parse().unwrap())))
}
// on_failure
Value::Int8(_)
| Value::Int16(_)
| Value::Int32(_)
| Value::Int64(_)
| Value::Uint8(_)
| Value::Uint16(_)
| Value::Uint32(_)
| Value::Uint64(_)
| Value::Float32(_)
| Value::Float64(_)
| Value::Boolean(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => match transform.get_default() {
Some(default) => coerce_value(default, transform),
None => coerce_value(transform.get_type_matched_default_val(), transform),
macro_rules! coerce_string_value {
($s:expr, $transform:expr, $type:ident, $parse:ident) => {
match $s.parse::<$type>() {
Ok(v) => Ok(Some(ValueData::$parse(v))),
Err(_) => match $transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => match $transform.get_default() {
Some(default) => coerce_value(default, $transform),
None => coerce_value($transform.get_type_matched_default_val(), $transform),
},
None => Err(format!(
"failed to coerce string value '{}' to type '{}'",
$s,
$transform.type_.to_str_type()
)),
},
None => Err(format!(
"failed to coerce string value '{s}' to type '{}'",
transform.type_.to_str_type()
)),
},
}
};
}
fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<ValueData>, String> {
match transform.type_ {
Value::Int8(_) => {
coerce_string_value!(s, transform, i32, I8Value)
}
Value::Int16(_) => {
coerce_string_value!(s, transform, i32, I16Value)
}
Value::Int32(_) => {
coerce_string_value!(s, transform, i32, I32Value)
}
Value::Int64(_) => {
coerce_string_value!(s, transform, i64, I64Value)
}
Value::Uint8(_) => {
coerce_string_value!(s, transform, u32, U8Value)
}
Value::Uint16(_) => {
coerce_string_value!(s, transform, u32, U16Value)
}
Value::Uint32(_) => {
coerce_string_value!(s, transform, u32, U32Value)
}
Value::Uint64(_) => {
coerce_string_value!(s, transform, u64, U64Value)
}
Value::Float32(_) => {
coerce_string_value!(s, transform, f32, F32Value)
}
Value::Float64(_) => {
coerce_string_value!(s, transform, f64, F64Value)
}
Value::Boolean(_) => {
coerce_string_value!(s, transform, bool, BoolValue)
}
Value::String(_) => Ok(Some(ValueData::StringValue(s.to_string()))),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => Err("default value not supported for Time".to_string()),
None => Err("String type not supported for Time".to_string()),
},
Value::Epoch(_) => match transform.on_failure {
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => Err("default value not supported for Epoch".to_string()),
None => Err("String type not supported for Epoch".to_string()),

View File

@@ -1,36 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::etl::transform::{Transformer, Transforms};
use crate::etl::value::Value;
pub struct NoopTransformer;
impl std::fmt::Display for NoopTransformer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "NoopTransformer")
}
}
impl Transformer for NoopTransformer {
type Output = Value;
fn new(_transforms: Transforms) -> Result<Self, String> {
Ok(NoopTransformer)
}
fn transform(&self, val: Value) -> Result<Self::Output, String> {
Ok(val)
}
}

View File

@@ -16,15 +16,15 @@ pub mod array;
pub mod map;
pub mod time;
pub use std::collections::HashMap;
use ahash::{HashMap, HashMapExt};
pub use array::Array;
pub use map::Map;
pub use time::{Epoch, Time};
pub use time::Timestamp;
/// Value can be used as type
/// acts as value: the enclosed value is the actual value
/// acts as type: the enclosed value is the default value
#[derive(Debug, Clone, PartialEq)]
pub enum Value {
// as value: null
@@ -47,8 +47,7 @@ pub enum Value {
Boolean(bool),
String(String),
Time(Time),
Epoch(Epoch),
Timestamp(Timestamp),
Array(Array),
Map(Map),
@@ -80,30 +79,26 @@ impl Value {
"boolean" => Ok(Value::Boolean(false)),
"string" => Ok(Value::String("".to_string())),
"time" => Ok(Value::Time(Time::default())),
"epoch" => match tail {
"timestamp" | "epoch" | "time" => match tail {
Some(resolution) if !resolution.is_empty() => match resolution.as_str() {
time::NANOSECOND_RESOLUTION | time::NANO_RESOLUTION | time::NS_RESOLUTION => {
Ok(Value::Epoch(Epoch::Nanosecond(0)))
Ok(Value::Timestamp(Timestamp::Nanosecond(0)))
}
time::MICROSECOND_RESOLUTION | time::MICRO_RESOLUTION | time::US_RESOLUTION => {
Ok(Value::Epoch(Epoch::Microsecond(0)))
Ok(Value::Timestamp(Timestamp::Microsecond(0)))
}
time::MILLISECOND_RESOLUTION | time::MILLI_RESOLUTION | time::MS_RESOLUTION => {
Ok(Value::Epoch(Epoch::Millisecond(0)))
Ok(Value::Timestamp(Timestamp::Millisecond(0)))
}
time::SECOND_RESOLUTION | time::SEC_RESOLUTION | time::S_RESOLUTION => {
Ok(Value::Epoch(Epoch::Second(0)))
Ok(Value::Timestamp(Timestamp::Second(0)))
}
_ => Err(format!(
"invalid resolution: '{resolution}'. Available resolutions: {}",
time::VALID_RESOLUTIONS.join(",")
)),
},
_ => Err(format!(
"resolution MUST BE set for epoch type: '{t}'. Available resolutions: {}",
time::VALID_RESOLUTIONS.join(", ")
)),
_ => Ok(Value::Timestamp(Timestamp::Nanosecond(0))),
},
"array" => Ok(Value::Array(Array::default())),
@@ -212,8 +207,7 @@ impl Value {
Value::Boolean(_) => "boolean",
Value::String(_) => "string",
Value::Time(_) => "time",
Value::Epoch(_) => "epoch",
Value::Timestamp(_) => "epoch",
Value::Array(_) => "array",
Value::Map(_) => "map",
@@ -244,8 +238,7 @@ impl std::fmt::Display for Value {
Value::Boolean(v) => format!("boolean({})", v),
Value::String(v) => format!("string({})", v),
Value::Time(v) => format!("time({})", v),
Value::Epoch(v) => format!("epoch({})", v),
Value::Timestamp(v) => format!("epoch({})", v),
Value::Array(v) => format!("{}", v),
Value::Map(v) => format!("{}", v),
@@ -275,14 +268,14 @@ impl TryFrom<serde_json::Value> for Value {
}
serde_json::Value::String(v) => Ok(Value::String(v)),
serde_json::Value::Array(v) => {
let mut values = vec![];
let mut values = Vec::with_capacity(v.len());
for v in v {
values.push(Value::try_from(v)?);
}
Ok(Value::Array(Array { values }))
}
serde_json::Value::Object(v) => {
let mut values = HashMap::new();
let mut values = HashMap::with_capacity(v.len());
for (k, v) in v {
values.insert(k, Value::try_from(v)?);
}

View File

@@ -12,15 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use ahash::{HashMap, HashMapExt};
use crate::etl::value::Value;
#[derive(Debug, Clone, PartialEq, Default)]
#[derive(Debug, Clone, PartialEq)]
pub struct Map {
pub values: HashMap<String, Value>,
}
impl Default for Map {
fn default() -> Self {
Self {
values: HashMap::with_capacity(30),
}
}
}
impl Map {
pub fn one(key: impl Into<String>, value: Value) -> Map {
let mut map = Map::default();
@@ -51,6 +59,12 @@ impl std::ops::Deref for Map {
}
}
impl std::ops::DerefMut for Map {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.values
}
}
impl std::fmt::Display for Map {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let values = self

View File

@@ -12,89 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::error;
#[derive(Debug, Clone, PartialEq)]
pub struct Time {
pub value: String,
pub nanosecond: i64,
pub format: Option<String>,
pub timezone: Option<String>,
// TODO(yuanbohan): support locale
// pub locale: Option<String>,
}
impl Time {
pub(crate) fn new(v: impl Into<String>, nanosecond: i64) -> Self {
let value = v.into();
Time {
value,
nanosecond,
format: None,
timezone: None,
}
}
pub(crate) fn with_format(&mut self, format: impl Into<String>) {
self.format = Some(format.into());
}
pub(crate) fn with_timezone(&mut self, timezone: Option<String>) {
self.timezone = timezone;
}
pub(crate) fn timestamp_nanos(&self) -> i64 {
self.nanosecond
}
pub(crate) fn timestamp_micros(&self) -> i64 {
self.nanosecond / 1_000
}
pub(crate) fn timestamp_millis(&self) -> i64 {
self.nanosecond / 1_000_000
}
pub(crate) fn timestamp(&self) -> i64 {
self.nanosecond / 1_000_000_000
}
}
impl Default for Time {
fn default() -> Self {
let dt = chrono::Utc::now();
let v = dt.to_rfc3339();
let ns = match dt.timestamp_nanos_opt() {
Some(ns) => ns,
None => {
error!("failed to get nanosecond from timestamp, use 0 instead");
0
}
};
Time::new(v, ns)
}
}
impl std::fmt::Display for Time {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let format = if let Some(format) = &self.format {
format!(", format: {}", format)
} else {
"".to_string()
};
let timezone = if let Some(timezone) = &self.timezone {
format!(", timezone: {}", timezone)
} else {
"".to_string()
};
write!(f, "{}, format: {}{}", self.value, format, timezone)
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Epoch {
pub enum Timestamp {
Nanosecond(i64),
Microsecond(i64),
Millisecond(i64),
@@ -129,57 +48,57 @@ pub(crate) const VALID_RESOLUTIONS: [&str; 12] = [
S_RESOLUTION,
];
impl Epoch {
impl Timestamp {
pub(crate) fn timestamp_nanos(&self) -> i64 {
match self {
Epoch::Nanosecond(v) => *v,
Epoch::Microsecond(v) => *v * 1_000,
Epoch::Millisecond(v) => *v * 1_000_000,
Epoch::Second(v) => *v * 1_000_000_000,
Timestamp::Nanosecond(v) => *v,
Timestamp::Microsecond(v) => *v * 1_000,
Timestamp::Millisecond(v) => *v * 1_000_000,
Timestamp::Second(v) => *v * 1_000_000_000,
}
}
pub(crate) fn timestamp_micros(&self) -> i64 {
match self {
Epoch::Nanosecond(v) => *v / 1_000,
Epoch::Microsecond(v) => *v,
Epoch::Millisecond(v) => *v * 1_000,
Epoch::Second(v) => *v * 1_000_000,
Timestamp::Nanosecond(v) => *v / 1_000,
Timestamp::Microsecond(v) => *v,
Timestamp::Millisecond(v) => *v * 1_000,
Timestamp::Second(v) => *v * 1_000_000,
}
}
pub(crate) fn timestamp_millis(&self) -> i64 {
match self {
Epoch::Nanosecond(v) => *v / 1_000_000,
Epoch::Microsecond(v) => *v / 1_000,
Epoch::Millisecond(v) => *v,
Epoch::Second(v) => *v * 1_000,
Timestamp::Nanosecond(v) => *v / 1_000_000,
Timestamp::Microsecond(v) => *v / 1_000,
Timestamp::Millisecond(v) => *v,
Timestamp::Second(v) => *v * 1_000,
}
}
pub(crate) fn timestamp(&self) -> i64 {
match self {
Epoch::Nanosecond(v) => *v / 1_000_000_000,
Epoch::Microsecond(v) => *v / 1_000_000,
Epoch::Millisecond(v) => *v / 1_000,
Epoch::Second(v) => *v,
Timestamp::Nanosecond(v) => *v / 1_000_000_000,
Timestamp::Microsecond(v) => *v / 1_000_000,
Timestamp::Millisecond(v) => *v / 1_000,
Timestamp::Second(v) => *v,
}
}
}
impl Default for Epoch {
impl Default for Timestamp {
fn default() -> Self {
Epoch::Nanosecond(chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0))
Timestamp::Nanosecond(chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0))
}
}
impl std::fmt::Display for Epoch {
impl std::fmt::Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let (value, resolution) = match self {
Epoch::Nanosecond(v) => (v, NANOSECOND_RESOLUTION),
Epoch::Microsecond(v) => (v, MICROSECOND_RESOLUTION),
Epoch::Millisecond(v) => (v, MILLISECOND_RESOLUTION),
Epoch::Second(v) => (v, SECOND_RESOLUTION),
Timestamp::Nanosecond(v) => (v, NANOSECOND_RESOLUTION),
Timestamp::Microsecond(v) => (v, MICROSECOND_RESOLUTION),
Timestamp::Millisecond(v) => (v, MILLISECOND_RESOLUTION),
Timestamp::Second(v) => (v, SECOND_RESOLUTION),
};
write!(f, "{}, resolution: {}", value, resolution)

View File

@@ -16,8 +16,9 @@ mod etl;
mod manager;
mod metrics;
pub use etl::transform::GreptimeTransformer;
pub use etl::value::Value;
pub use etl::processor::Processor;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{parse, Content, Pipeline};
pub use manager::{
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,

View File

@@ -0,0 +1,373 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
const TEST_INPUT: &str = r#"
{
"input_str": "2024-06-27T06:13:36.991Z"
}"#;
const TEST_VALUE: Option<ValueData> =
Some(ValueData::TimestampNanosecondValue(1719468816991000000));
lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"ts".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}
#[test]
fn test_timestamp_parse_date() {
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}
#[test]
fn test_timestamp_multi_formats() {
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}
#[test]
fn test_timestamp_ignore_missing() {
{
let empty_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
ignore_missing: true
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(empty_input, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, None);
}
{
let empty_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- timestamp:
field: input_s
resolution: s
ignore_missing: true
transform:
- fields:
- input_s, ts
type: timestamp, s
"#;
let expected_schema = vec![
common::make_column_schema(
"ts".to_string(),
ColumnDataType::TimestampSecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(empty_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
}
}
#[test]
fn test_timestamp_timezone() {
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_str
formats:
- ["%Y-%m-%dT%H:%M:%S", "Asia/Shanghai"]
- ["%Y-%m-%dT%H:%M:%S%.3fZ", "Asia/Shanghai"]
ignore_missing: true
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampNanosecondValue(1719440016991000000))
);
}
#[test]
fn test_timestamp_parse_epoch() {
let test_input = r#"
{
"input_s": "1722580862",
"input_sec": "1722580862",
"input_second": "1722580862",
"input_ms": "1722580887794",
"input_millisecond": "1722580887794",
"input_milli": "1722580887794",
"input_default": "1722580887794",
"input_us": "1722580905423969",
"input_microsecond": "1722580905423969",
"input_micro": "1722580905423969",
"input_ns": "1722580929863842048",
"input_nanosecond": "1722580929863842048",
"input_nano": "1722580929863842048"
}"#;
let pipeline_yaml = r#"
processors:
- timestamp:
field: input_s
resolution: s
- timestamp:
field: input_sec
resolution: sec
- timestamp:
field: input_second
resolution: second
- timestamp:
field: input_ms
resolution: ms
- timestamp:
field: input_millisecond
resolution: millisecond
- timestamp:
field: input_milli
resolution: milli
- timestamp:
field: input_default
- timestamp:
field: input_us
resolution: us
- timestamp:
field: input_microsecond
resolution: microsecond
- timestamp:
field: input_micro
resolution: micro
- timestamp:
field: input_ns
resolution: ns
- timestamp:
field: input_nanosecond
resolution: nanosecond
- timestamp:
field: input_nano
resolution: nano
transform:
- field: input_s
type: timestamp, s
- field: input_sec
type: timestamp, sec
- field: input_second
type: timestamp, second
- field: input_ms
type: timestamp, ms
- field: input_millisecond
type: timestamp, millisecond
- field: input_milli
type: timestamp, milli
- field: input_default
type: timestamp, milli
- field: input_us
type: timestamp, us
- field: input_microsecond
type: timestamp, microsecond
- field: input_micro
type: timestamp, micro
- field: input_ns
type: timestamp, ns
- field: input_nanosecond
type: timestamp, nanosecond
- field: input_nano
type: timestamp, nano
"#;
fn make_time_field(name: &str, datatype: ColumnDataType) -> ColumnSchema {
common::make_column_schema(name.to_string(), datatype, SemanticType::Field)
}
let expected_schema = vec![
make_time_field("input_s", ColumnDataType::TimestampSecond),
make_time_field("input_sec", ColumnDataType::TimestampSecond),
make_time_field("input_second", ColumnDataType::TimestampSecond),
make_time_field("input_ms", ColumnDataType::TimestampMillisecond),
make_time_field("input_millisecond", ColumnDataType::TimestampMillisecond),
make_time_field("input_milli", ColumnDataType::TimestampMillisecond),
make_time_field("input_default", ColumnDataType::TimestampMillisecond),
make_time_field("input_us", ColumnDataType::TimestampMicrosecond),
make_time_field("input_microsecond", ColumnDataType::TimestampMicrosecond),
make_time_field("input_micro", ColumnDataType::TimestampMicrosecond),
make_time_field("input_ns", ColumnDataType::TimestampNanosecond),
make_time_field("input_nanosecond", ColumnDataType::TimestampNanosecond),
make_time_field("input_nano", ColumnDataType::TimestampNanosecond),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
for i in 0..2 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampSecondValue(1722580862))
);
}
for i in 3..6 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampMillisecondValue(1722580887794))
);
}
for i in 7..9 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampMicrosecondValue(1722580905423969))
);
}
for i in 10..12 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampNanosecondValue(1722580929863842048))
);
}
}
#[test]
fn test_timestamp_default_wrong_resolution() {
let test_input = r#"
{
"input_s": "1722580862",
"input_nano": "1722583122284583936"
}"#;
let pipeline_yaml = r#"
processors:
- timestamp:
fields:
- input_s
- input_nano
transform:
- fields:
- input_s
type: timestamp, s
- fields:
- input_nano
type: timestamp, nano
"#;
let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampSecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
// this is actually wrong
// TODO(shuiyisong): add check for type when converting epoch
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1722580862))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampMillisecondValue(1722583122284583936))
);
}

View File

@@ -26,9 +26,9 @@ use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, TypedHeader};
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu};
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::{PipelineVersion, Value as PipelineValue};
use pipeline::PipelineVersion;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Value};
@@ -194,43 +194,40 @@ pub async fn delete_pipeline(
}
/// Transform NDJSON array into a single array
/// always return an array
fn transform_ndjson_array_factory(
values: impl IntoIterator<Item = StdResult<Value, serde_json::Error>>,
ignore_error: bool,
) -> Result<Value> {
values.into_iter().try_fold(
Value::Array(Vec::with_capacity(100)),
|acc, item| match acc {
Value::Array(mut acc_array) => {
if let Ok(item_value) = item {
match item_value {
Value::Array(item_array) => {
acc_array.extend(item_array);
}
Value::Object(_) => {
acc_array.push(item_value);
}
_ => {
if !ignore_error {
warn!("invalid item in array: {:?}", item_value);
return InvalidParameterSnafu {
reason: format!("invalid item:{} in array", item_value),
}
.fail();
) -> Result<Vec<Value>> {
values
.into_iter()
.try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
Ok(item_value) => {
match item_value {
Value::Array(item_array) => {
acc_array.extend(item_array);
}
Value::Object(_) => {
acc_array.push(item_value);
}
_ => {
if !ignore_error {
warn!("invalid item in array: {:?}", item_value);
return InvalidParameterSnafu {
reason: format!("invalid item:{} in array", item_value),
}
.fail();
}
}
Ok(Value::Array(acc_array))
} else if !ignore_error {
item.context(ParseJsonSnafu)
} else {
warn!("invalid item in array: {:?}", item);
Ok(Value::Array(acc_array))
}
Ok(acc_array)
}
_ => unreachable!("invalid acc: {:?}", acc),
},
)
Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
Err(_) => {
warn!("invalid item in array: {:?}", item);
Ok(acc_array)
}
})
}
#[axum_macros::debug_handler]
@@ -284,26 +281,17 @@ fn extract_pipeline_value_by_content_type(
content_type: ContentType,
payload: String,
ignore_errors: bool,
) -> Result<PipelineValue> {
) -> Result<Vec<Value>> {
Ok(match content_type {
ct if ct == ContentType::json() => {
let json_value = transform_ndjson_array_factory(
Deserializer::from_str(&payload).into_iter(),
ignore_errors,
)?;
PipelineValue::try_from(json_value)
.map_err(|reason| CastTypeSnafu { msg: reason }.build())
.context(PipelineSnafu)?
}
ct if ct == ContentType::text() || ct == ContentType::text_utf8() => {
let arr = payload
.lines()
.filter(|line| !line.is_empty())
.map(|line| PipelineValue::String(line.to_string()))
.collect::<Vec<PipelineValue>>();
PipelineValue::Array(arr.into())
}
ct if ct == ContentType::json() => transform_ndjson_array_factory(
Deserializer::from_str(&payload).into_iter(),
ignore_errors,
)?,
ct if ct == ContentType::text() || ct == ContentType::text_utf8() => payload
.lines()
.filter(|line| !line.is_empty())
.map(|line| Value::String(line.to_string()))
.collect(),
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
})
}
@@ -313,7 +301,7 @@ async fn ingest_logs_inner(
pipeline_name: String,
version: PipelineVersion,
table_name: String,
pipeline_data: PipelineValue,
pipeline_data: Vec<Value>,
query_ctx: QueryContextRef,
) -> Result<HttpResponse> {
let db = query_ctx.get_db_string();
@@ -324,20 +312,41 @@ async fn ingest_logs_inner(
.await?;
let transform_timer = std::time::Instant::now();
let transformed_data: Rows = pipeline
.exec(pipeline_data)
.inspect(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.map_err(|reason| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
PipelineTransformSnafu { reason }.build()
})
.context(PipelineSnafu)?;
let mut intermediate_state = pipeline.init_intermediate_state();
let mut results = Vec::with_capacity(pipeline_data.len());
for v in pipeline_data {
pipeline
.prepare(v, &mut intermediate_state)
.map_err(|reason| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
PipelineTransformSnafu { reason }.build()
})
.context(PipelineSnafu)?;
let r = pipeline
.exec_mut(&mut intermediate_state)
.map_err(|reason| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
PipelineTransformSnafu { reason }.build()
})
.context(PipelineSnafu)?;
results.push(r);
pipeline.reset_intermediate_state(&mut intermediate_state);
}
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
let transformed_data: Rows = Rows {
rows: results,
schema: pipeline.schemas().clone(),
};
let insert_request = RowInsertRequest {
rows: Some(transformed_data),
@@ -394,27 +403,31 @@ mod tests {
#[test]
fn test_transform_ndjson() {
let s = "{\"a\": 1}\n{\"b\": 2}";
let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false)
.unwrap()
.to_string();
let a = Value::Array(
transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
)
.to_string();
assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
let s = "{\"a\": 1}";
let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false)
.unwrap()
.to_string();
let a = Value::Array(
transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
)
.to_string();
assert_eq!(a, "[{\"a\":1}]");
let s = "[{\"a\": 1}]";
let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false)
.unwrap()
.to_string();
let a = Value::Array(
transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
)
.to_string();
assert_eq!(a, "[{\"a\":1}]");
let s = "[{\"a\": 1}, {\"b\": 2}]";
let a = transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false)
.unwrap()
.to_string();
let a = Value::Array(
transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
)
.to_string();
assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
}
}

View File

@@ -10,5 +10,6 @@ extend-exclude = [
"corrupted",
"tests-fuzz/src/data/lorem_words",
"*.sql",
"*.result"
"*.result",
"src/pipeline/benches/data.log"
]