feat: introduce pipeline crate (#4109)

* chore: introduce pipeline crate

* chore: fix typo
This commit is contained in:
shuiyisong
2024-06-06 01:23:25 +08:00
committed by GitHub
parent 16b85b06b6
commit 2ade511f26
24 changed files with 5531 additions and 0 deletions

View File

@@ -0,0 +1,13 @@
# pipeline
ETL capability
## processors
refer [elastic ingest processor][elastic-ingest-processor] for detail
### Example
Go to [pipeline](../../tests/pipeline.rs)
[elastic-ingest-processor]: https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html

View File

@@ -0,0 +1,195 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use itertools::Itertools;
#[derive(Debug, Default, Clone)]
pub struct Fields(Vec<Field>);
impl Fields {
pub(crate) fn new(fields: Vec<Field>) -> Result<Self, String> {
let ff = Fields(fields);
ff.check()
}
pub(crate) fn one(field: Field) -> Self {
Fields(vec![field])
}
pub(crate) fn get_target_fields(&self) -> Vec<&str> {
self.0.iter().map(|f| f.get_target_field()).collect()
}
fn check(self) -> Result<Self, String> {
if self.0.is_empty() {
return Err("fields must not be empty".to_string());
}
let mut set = std::collections::HashSet::new();
for f in self.0.iter() {
if set.contains(&f.field) {
return Err(format!(
"field name must be unique, but got duplicated: {}",
f.field
));
}
set.insert(&f.field);
}
Ok(self)
}
}
impl std::fmt::Display for Fields {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = self.0.iter().map(|f| f.to_string()).join(";");
write!(f, "{s}")
}
}
impl std::ops::Deref for Fields {
type Target = Vec<Field>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug, Default, Clone)]
pub struct Field {
pub field: String,
// rename
pub target_field: Option<String>,
// 1-to-many mapping
// processors:
// - csv
pub target_fields: Option<Vec<String>>,
}
impl Field {
pub(crate) fn new(field: impl Into<String>) -> Self {
Field {
field: field.into(),
target_field: None,
target_fields: None,
}
}
// column_name in transform
pub(crate) fn get_target_field(&self) -> &str {
self.target_field.as_deref().unwrap_or(&self.field)
}
pub(crate) fn get_field(&self) -> &str {
&self.field
}
}
impl std::str::FromStr for Field {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.split(',');
let field = parts.next().ok_or("field is missing")?.trim().to_string();
if field.is_empty() {
return Err("field is empty".to_string());
}
let target_field = match parts.next() {
Some(s) if !s.trim().is_empty() => Some(s.trim().to_string()),
_ => None,
};
let fields: Vec<_> = parts
.filter(|s| !s.trim().is_empty())
.map(|s| s.trim().to_string())
.collect();
let target_fields = if fields.is_empty() {
None
} else {
Some(fields)
};
Ok(Field {
field,
target_field,
target_fields,
})
}
}
impl std::fmt::Display for Field {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match (&self.target_field, &self.target_fields) {
(Some(target_field), None) => write!(f, "{}, {target_field}", self.field),
(None, Some(target_fields)) => {
write!(f, "{}, {}", self.field, target_fields.iter().join(","))
}
_ => write!(f, "{}", self.field),
}
}
}
#[cfg(test)]
mod tests {
use crate::etl::field::Field;
#[test]
fn test_parse_field() {
let field: Result<Field, _> = " ".parse();
assert!(field.is_err());
let field: Result<Field, _> = ",".parse();
assert!(field.is_err());
let field: Result<Field, _> = ",field".parse();
assert!(field.is_err());
let cases = [
// ("field", "field", None, None),
(
"field, target_field",
"field",
Some("target_field".into()),
None,
),
(
"field, target_field1, target_field2, target_field3",
"field",
Some("target_field1".into()),
Some(vec!["target_field2".into(), "target_field3".into()]),
),
(
"field,, target_field1, target_field2, target_field3",
"field",
None,
Some(vec![
"target_field1".into(),
"target_field2".into(),
"target_field3".into(),
]),
),
];
for (s, field, target_field, target_fields) in cases.into_iter() {
let f: Field = s.parse().unwrap();
assert_eq!(f.get_field(), field, "{s}");
assert_eq!(f.target_field, target_field, "{s}");
assert_eq!(f.target_fields, target_fields, "{s}");
}
}
}

195
src/pipeline/src/etl/mod.rs Normal file
View File

@@ -0,0 +1,195 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
pub mod field;
pub mod processor;
pub mod transform;
pub mod value;
use itertools::Itertools;
use transform::{Transformer, Transforms};
use yaml_rust::YamlLoader;
const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
pub enum Content {
Json(String),
Yaml(String),
}
pub fn parse<T>(input: &Content) -> Result<Pipeline<T>, String>
where
T: Transformer,
{
match input {
Content::Yaml(str) => {
let docs = YamlLoader::load_from_str(str).map_err(|e| e.to_string())?;
let doc = &docs[0];
let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
v.try_into()?
} else {
processor::Processors::default()
};
let transforms = if let Some(v) = doc[TRANSFORM].as_vec() {
v.try_into()?
} else {
Transforms::default()
};
Ok(Pipeline {
description,
processors,
transformer: T::new(transforms)?,
})
}
Content::Json(_) => unimplemented!(),
}
}
#[derive(Debug, Clone)]
pub struct Pipeline<T>
where
T: Transformer,
{
description: Option<String>,
processors: processor::Processors,
transformer: T,
// pub on_failure: processor::Processors,
}
impl<T> std::fmt::Display for Pipeline<T>
where
T: Transformer,
{
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(description) = &self.description {
writeln!(f, "description: {description}")?;
}
let processors = self.processors.iter().map(|p| p.kind()).join(",");
writeln!(f, "processors: {processors}")?;
writeln!(f, "transformer: {}", self.transformer)
}
}
impl<T> Pipeline<T>
where
T: Transformer,
{
pub fn exec(&self, val: value::Value) -> Result<T::Output, String> {
let mut val = val;
for processor in self.processors.iter() {
val = processor.exec(val)?;
}
self.transformer.transform(val)
}
}
#[cfg(test)]
mod tests {
use greptime_proto::v1::{self, ColumnDataType, SemanticType};
use crate::etl::transform::GreptimeTransformer;
use crate::etl::{parse, Content, Pipeline};
#[test]
fn test_csv_pipeline() {
let input_value_str = r#"
{
"my_field": "1,2",
"foo": "bar"
}
"#;
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
let pipeline_yaml = r#"
---
description: Pipeline for Apache Tomcat
processors:
- csv:
field: my_field, field1, field2
transform:
- field: field1
type: uint32
- field: field2
type: uint32
"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let output = pipeline.exec(input_value.try_into().unwrap());
assert!(output.is_ok());
}
#[test]
fn test_date_pipeline() {
let input_value_str = r#"
{
"my_field": "1,2",
"foo": "bar",
"test_time": "2014-5-17T04:34:56+00:00"
}
"#;
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
let pipeline_yaml = r#"
---
description: Pipeline for Apache Tomcat
processors:
- date:
field: test_time
transform:
- field: test_time
type: time
index: timestamp
"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let output = pipeline.exec(input_value.try_into().unwrap()).unwrap();
let schemas = output.schema;
assert_eq!(schemas.len(), 1);
let schema = schemas[0].clone();
assert_eq!("test_time", schema.column_name);
assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
let row = output.rows[0].clone();
assert_eq!(1, row.values.len());
let value_data = row.values[0].clone().value_data;
assert_eq!(
Some(v1::value::ValueData::TimestampNanosecondValue(
1400301296000000000
)),
value_data
);
}
}

View File

@@ -0,0 +1,361 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use urlencoding::decode;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::{Map, Value};
pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
const CMCD_KEY_BR: &str = "br"; // Encoded bitrate, Integer kbps
const CMCD_KEY_BL: &str = "bl"; // Buffer length, Integer milliseconds
const CMCD_KEY_BS: &str = "bs"; // Buffer starvation, Boolean
const CMCD_KEY_CID: &str = "cid"; // Content ID, String
const CMCD_KEY_D: &str = "d"; // Object duration, Integer milliseconds
const CMCD_KEY_DL: &str = "dl"; // Deadline, Integer milliseconds
const CMCD_KEY_MTP: &str = "mtp"; // Measured throughput, Integer kbps
const CMCD_KEY_NOR: &str = "nor"; // Next object request, String
const CMCD_KEY_NRR: &str = "nrr"; // Next request range, String, "<range-start>-<range-end>"
const CMCD_KEY_OT: &str = "ot"; // Object type, Token - one of [m,a,v,av,i,c,tt,k,o]
const CMCD_KEY_PR: &str = "pr"; // Playback rate, Decimal
const CMCD_KEY_RTP: &str = "rtp"; // Requested maximum throughput, Integer kbps
const CMCD_KEY_SF: &str = "sf"; // Stall frequency, Token - one of [d,h,s,o]
const CMCD_KEY_SID: &str = "sid"; // Session ID, String
const CMCD_KEY_ST: &str = "st"; // Stream type, Token - one of [v,l]
const CMCD_KEY_SU: &str = "su"; // Startup, Boolean
const CMCD_KEY_TB: &str = "tb"; // Top bitrate, Integer kbps
const CMCD_KEY_V: &str = "v"; // Version
/// Common Media Client Data Specification:
/// https://cdn.cta.tech/cta/media/media/resources/standards/pdfs/cta-5004-final.pdf
///
///
/// The data payload for Header and Query Argument transmission consists of a series of
/// key/value pairs constructed according to the following rules:
/// 1. All information in the payload MUST be represented as <key>=<value> pairs.
/// 2. The key and value MUST be separated by an equals sign Unicode 0x3D. If the
/// value type is BOOLEAN and the value is TRUE, then the equals sign and the value
/// MUST be omitted.
/// 3. Successive key/value pairs MUST be delimited by a comma Unicode 0x2C.
/// 4. The key names described in this specification are reserved. Custom key names
/// may be used, but they MUST carry a hyphenated prefix to ensure that there will
/// not be a namespace collision with future revisions to this specification. Clients
/// SHOULD use a reverse-DNS syntax when defining their own prefix.
/// 5. If headers are used for data transmission, then custom keys SHOULD be
/// allocated to one of the four defined header names based upon their expected
/// level of variability:
/// a. CMCD-Request: keys whose values vary with each request.
/// b. CMCD-Object: keys whose values vary with the object being requested.
/// c. CMCD-Status: keys whose values do not vary with every request or object.
/// d. CMCD-Session: keys whose values are expected to be invariant over the life of the session.
/// 6. All key names are case-sensitive.
/// 7. Any value of type String MUST be enclosed by opening and closing double
/// quotes Unicode 0x22. Double quotes and backslashes MUST be escaped using a
/// backslash "\" Unicode 0x5C character. Any value of type Token does not require
/// quoting.
/// 8. All keys are OPTIONAL.
/// 9. Key-value pairs SHOULD be sequenced in alphabetical order of the key name in
/// order to reduce the fingerprinting surface exposed by the player.
/// 10. If the data payload is transmitted as a query argument, then the entire payload
/// string MUST be URLEncoded per [5]. Data payloads transmitted via headers
/// MUST NOT be URLEncoded.
/// 11. The data payload syntax is intended to be compliant with Structured Field Values for HTTP [6].
/// 12. Transport Layer Security SHOULD be used to protect all transmission of CMCD data.
#[derive(Debug, Default)]
pub struct CMCDProcessor {
fields: Fields,
ignore_missing: bool,
}
impl CMCDProcessor {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields;
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
fn parse(prefix: &str, s: &str) -> Result<Map, String> {
let mut map = Map::default();
let parts = s.split(',');
for part in parts {
let mut kv = part.split('=');
let k = kv.next().ok_or(format!("{part} missing key in {s}"))?;
let v = kv.next();
let key = format!("{prefix}_{k}");
match k {
CMCD_KEY_BS | CMCD_KEY_SU => {
map.insert(key, Value::Boolean(true));
}
CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP
| CMCD_KEY_RTP | CMCD_KEY_TB => {
let v = v.ok_or(format!("{k} missing value in {s}"))?;
let val: i64 = v
.parse()
.map_err(|_| format!("failed to parse {v} as i64"))?;
map.insert(key, Value::Int64(val));
}
CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID
| CMCD_KEY_ST | CMCD_KEY_V => {
let v = v.ok_or(format!("{k} missing value in {s}"))?;
map.insert(key, Value::String(v.to_string()));
}
CMCD_KEY_NOR => {
let v = v.ok_or(format!("{k} missing value in {s}"))?;
let val = match decode(v) {
Ok(val) => val.to_string(),
Err(_) => v.to_string(),
};
map.insert(key, Value::String(val));
}
CMCD_KEY_PR => {
let v = v.ok_or(format!("{k} missing value in {s}"))?;
let val: f64 = v
.parse()
.map_err(|_| format!("failed to parse {v} as f64"))?;
map.insert(key, Value::Float64(val));
}
_ => match v {
Some(v) => map.insert(key, Value::String(v.to_string())),
None => map.insert(k, Value::Boolean(true)),
},
}
}
Ok(map)
}
fn process_field(&self, val: &str, field: &Field) -> Result<Map, String> {
let prefix = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
Self::parse(prefix, val)
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for CMCDProcessor {
type Error = String;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = CMCDProcessor::default();
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
_ => {}
}
}
Ok(processor)
}
}
impl crate::etl::processor::Processor for CMCDProcessor {
fn kind(&self) -> &str {
PROCESSOR_CMCD
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn fields(&self) -> &Fields {
&self.fields
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_field(val, field),
_ => Err(format!(
"{} processor: expect string value, but got {val:?}",
self.kind()
)),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use urlencoding::decode;
use super::CMCDProcessor;
use crate::etl::value::{Map, Value};
#[test]
fn test_cmcd() {
let ss = [
(
"sid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
vec![(
"prefix_sid",
Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
)],
),
(
"br%3D3200%2Cbs%2Cd%3D4004%2Cmtp%3D25400%2Cot%3Dv%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Ctb%3D6000",
vec![
("prefix_bs", Value::Boolean(true)),
("prefix_ot", Value::String("v".into())),
("prefix_rtp", Value::Int64(15000)),
("prefix_br", Value::Int64(3200)),
("prefix_tb", Value::Int64(6000)),
("prefix_d", Value::Int64(4004)),
(
"prefix_sid",
Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
),
("prefix_mtp", Value::Int64(25400)),
],
),
(
"b%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
vec![
(
"prefix_sid",
Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
),
("prefix_rtp", Value::Int64(15000)),
("b", Value::Boolean(true)),
],
),
(
"bs%2Csu",
vec![
("prefix_su", Value::Boolean(true)),
("prefix_bs", Value::Boolean(true)),
],
),
(
"d%3D4004%2Ccom.example-myNumericKey%3D500%2Ccom.examplemyStringKey%3D%22myStringValue%22",
vec![
(
"prefix_com.example-myNumericKey",
Value::String("500".into()),
),
(
"prefix_com.examplemyStringKey",
Value::String("\"myStringValue\"".into()),
),
("prefix_d", Value::Int64(4004)),
],
),
(
"nor%3D%22..%252F300kbps%252Fsegment35.m4v%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
vec![
(
"prefix_sid",
Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
),
(
"prefix_nor",
Value::String("\"../300kbps/segment35.m4v\"".into()),
),
],
),
(
"nrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
vec![
("prefix_nrr", Value::String("\"12323-48763\"".into())),
(
"prefix_sid",
Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
),
],
),
(
"nor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
vec![
("prefix_nrr", Value::String("\"12323-48763\"".into())),
(
"prefix_sid",
Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
),
(
"prefix_nor",
Value::String("\"../300kbps/track.m4v\"".into()),
),
],
),
(
"bl%3D21300%2Cbr%3D3200%2Cbs%2Ccid%3D%22faec5fc2-ac30-11eabb37-0242ac130002%22%2Cd%3D4004%2Cdl%3D18500%2Cmtp%3D48100%2Cnor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Cot%3Dv%2Cpr%3D1.08%2Crtp%3D12000%2Csf%3Dd%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Cst%3Dv%2Csu%2Ctb%3D6000",
vec![
("prefix_bl", Value::Int64(21300)),
("prefix_bs", Value::Boolean(true)),
("prefix_st", Value::String("v".into())),
("prefix_ot", Value::String("v".into())),
(
"prefix_sid",
Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
),
("prefix_tb", Value::Int64(6000)),
("prefix_d", Value::Int64(4004)),
(
"prefix_cid",
Value::String("\"faec5fc2-ac30-11eabb37-0242ac130002\"".into()),
),
("prefix_mtp", Value::Int64(48100)),
("prefix_rtp", Value::Int64(12000)),
(
"prefix_nor",
Value::String("\"../300kbps/track.m4v\"".into()),
),
("prefix_sf", Value::String("d".into())),
("prefix_br", Value::Int64(3200)),
("prefix_nrr", Value::String("\"12323-48763\"".into())),
("prefix_pr", Value::Float64(1.08)),
("prefix_su", Value::Boolean(true)),
("prefix_dl", Value::Int64(18500)),
],
),
];
for (s, vec) in ss.into_iter() {
let decoded = decode(s).unwrap().to_string();
let values = vec
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<HashMap<String, Value>>();
let expected = Map { values };
let actual = CMCDProcessor::parse("prefix", &decoded).unwrap();
assert_eq!(actual, expected);
}
}
}

View File

@@ -0,0 +1,327 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html
use std::collections::HashMap;
use csv::{ReaderBuilder, Trim};
use itertools::EitherOrBoth::{Both, Left, Right};
use itertools::Itertools;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME,
};
use crate::etl::value::{Map, Value};
pub(crate) const PROCESSOR_CSV: &str = "csv";
const SEPARATOR_NAME: &str = "separator";
const QUOTE_NAME: &str = "quote";
const TRIM_NAME: &str = "trim";
const EMPTY_VALUE_NAME: &str = "empty_value";
/// only support string value
#[derive(Debug)]
pub struct CsvProcessor {
reader: ReaderBuilder,
fields: Fields,
ignore_missing: bool,
// Value used to fill empty fields, empty fields will be skipped if this is not provided.
empty_value: Option<String>,
// description
// if
// ignore_failure
// on_failure
// tag
}
impl CsvProcessor {
fn new() -> Self {
let mut reader = ReaderBuilder::new();
reader.has_headers(false);
Self {
reader,
fields: Fields::default(),
ignore_missing: false,
empty_value: None,
}
}
fn with_fields(&mut self, fields: Fields) {
self.fields = fields;
}
fn try_separator(&mut self, separator: String) -> Result<(), String> {
if separator.len() != 1 {
Err(format!(
"'{}' must be a single character, but got '{}'",
SEPARATOR_NAME, separator
))
} else {
self.reader.delimiter(separator.as_bytes()[0]);
Ok(())
}
}
fn try_quote(&mut self, quote: String) -> Result<(), String> {
if quote.len() != 1 {
Err(format!(
"'{}' must be a single character, but got '{}'",
QUOTE_NAME, quote
))
} else {
self.reader.quote(quote.as_bytes()[0]);
Ok(())
}
}
fn with_trim(&mut self, trim: bool) {
if trim {
self.reader.trim(Trim::All);
} else {
self.reader.trim(Trim::None);
}
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
fn with_empty_value(&mut self, empty_value: String) {
self.empty_value = Some(empty_value);
}
// process the csv format string to a map with target_fields as keys
fn process_field(&self, val: &str, field: &Field) -> Result<Map, String> {
let mut reader = self.reader.from_reader(val.as_bytes());
if let Some(result) = reader.records().next() {
let record: csv::StringRecord = result.map_err(|e| e.to_string())?;
let values: HashMap<String, Value> = field
.target_fields
.as_ref()
.ok_or(format!(
"target fields must be set after '{}'",
field.get_field()
))?
.iter()
.map(|f| f.to_string())
.zip_longest(record.iter())
.filter_map(|zipped| match zipped {
Both(target_field, val) => Some((target_field, Value::String(val.into()))),
// if target fields are more than extracted fields, fill the rest with empty value
Left(target_field) => {
let value = self
.empty_value
.as_ref()
.map(|s| Value::String(s.clone()))
.unwrap_or(Value::Null);
Some((target_field, value))
}
// if extracted fields are more than target fields, ignore the rest
Right(_) => None,
})
.collect();
Ok(Map { values })
} else {
Err("expected at least one record from csv format, but got none".into())
}
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessor {
type Error = String;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = CsvProcessor::new();
for (k, v) in hash {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
SEPARATOR_NAME => {
processor.try_separator(yaml_string(v, SEPARATOR_NAME)?)?;
}
QUOTE_NAME => {
processor.try_quote(yaml_string(v, QUOTE_NAME)?)?;
}
TRIM_NAME => {
processor.with_trim(yaml_bool(v, TRIM_NAME)?);
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
EMPTY_VALUE_NAME => {
processor.with_empty_value(yaml_string(v, EMPTY_VALUE_NAME)?);
}
_ => {}
}
}
Ok(processor)
}
}
impl Processor for CsvProcessor {
fn kind(&self) -> &str {
PROCESSOR_CSV
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn fields(&self) -> &Fields {
&self.fields
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_field(val, field),
_ => Err(format!(
"{} processor: expect string value, but got {val:?}",
self.kind()
)),
}
}
}
// TODO(yuanbohan): more test cases
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::{CsvProcessor, Value};
use crate::etl::field::Fields;
use crate::etl::processor::Processor;
use crate::etl::value::Map;
#[test]
fn test_equal_length() {
let mut processor = CsvProcessor::new();
let field = "data,, a, b".parse().unwrap();
processor.with_fields(Fields::one(field));
let values: HashMap<String, Value> = [("data".into(), Value::String("1,2".into()))]
.into_iter()
.collect();
let result = processor.exec(Value::Map(Map { values })).unwrap();
let values = [
("data".into(), Value::String("1,2".into())),
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
]
.into_iter()
.collect();
let expected = Value::Map(Map { values });
assert_eq!(expected, result);
}
// test target_fields length larger than the record length
#[test]
fn test_target_fields_has_more_length() {
let values = [("data".into(), Value::String("1,2".into()))]
.into_iter()
.collect();
let input = Value::Map(Map { values });
// with no empty value
{
let mut processor = CsvProcessor::new();
let field = "data,, a,b,c".parse().unwrap();
processor.with_fields(Fields::one(field));
let result = processor.exec(input.clone()).unwrap();
let values = [
("data".into(), Value::String("1,2".into())),
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
("c".into(), Value::Null),
]
.into_iter()
.collect();
let expected = Value::Map(Map { values });
assert_eq!(expected, result);
}
// with empty value
{
let mut processor = CsvProcessor::new();
let field = "data,, a,b,c".parse().unwrap();
processor.with_fields(Fields::one(field));
processor.with_empty_value("default".into());
let result = processor.exec(input).unwrap();
let values = [
("data".into(), Value::String("1,2".into())),
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
("c".into(), Value::String("default".into())),
]
.into_iter()
.collect();
let expected = Value::Map(Map { values });
assert_eq!(expected, result);
}
}
// test record has larger length
#[test]
fn test_target_fields_has_less_length() {
let values = [("data".into(), Value::String("1,2,3".into()))]
.into_iter()
.collect();
let input = Value::Map(Map { values });
let mut processor = CsvProcessor::new();
let field = "data,,a,b".parse().unwrap();
processor.with_fields(Fields::one(field));
let result = processor.exec(input).unwrap();
let values = [
("data".into(), Value::String("1,2,3".into())),
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
]
.into_iter()
.collect();
let expected = Value::Map(Map { values });
assert_eq!(expected, result);
}
}

View File

@@ -0,0 +1,345 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::{DateTime, NaiveDateTime};
use chrono_tz::Tz;
use lazy_static::lazy_static;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::{Map, Time, Value};
pub(crate) const PROCESSOR_DATE: &str = "date";
const FORMATS_NAME: &str = "formats"; // default RFC3339
const TIMEZONE_NAME: &str = "timezone"; // default UTC
const LOCALE_NAME: &str = "locale";
const OUTPUT_FORMAT_NAME: &str = "output_format"; // default with input format
lazy_static! {
static ref DEFAULT_FORMATS: Vec<String> = vec![
// timezone with colon
"%Y-%m-%dT%H:%M:%S%:z",
"%Y-%m-%dT%H:%M:%S%.3f%:z",
"%Y-%m-%dT%H:%M:%S%.6f%:z",
"%Y-%m-%dT%H:%M:%S%.9f%:z",
// timezone without colon
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S%.3f%z",
"%Y-%m-%dT%H:%M:%S%.6f%z",
"%Y-%m-%dT%H:%M:%S%.9f%z",
// without timezone
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S%.3f",
"%Y-%m-%dT%H:%M:%S%.6f",
"%Y-%m-%dT%H:%M:%S%.9f",
]
.iter()
.map(|s| s.to_string())
.collect();
}
#[derive(Debug, Default)]
struct Formats(Vec<String>);
impl Formats {
fn new(mut formats: Vec<String>) -> Self {
formats.sort();
formats.dedup();
Formats(formats)
}
}
impl std::ops::Deref for Formats {
type Target = Vec<String>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug, Default)]
pub struct DateProcessor {
fields: Fields,
formats: Formats,
timezone: Option<String>,
locale: Option<String>, // to support locale
output_format: Option<String>,
ignore_missing: bool,
// description
// if
// ignore_failure
// on_failure
// tag
}
impl DateProcessor {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields
}
fn with_formats(&mut self, v: Option<Vec<String>>) {
let v = match v {
Some(v) if !v.is_empty() => v,
_ => DEFAULT_FORMATS.clone(),
};
let formats = Formats::new(v);
self.formats = formats;
}
fn with_timezone(&mut self, timezone: String) {
if !timezone.is_empty() {
self.timezone = Some(timezone);
}
}
fn with_locale(&mut self, locale: String) {
if !locale.is_empty() {
self.locale = Some(locale);
}
}
fn with_output_format(&mut self, output_format: String) {
if !output_format.is_empty() {
self.output_format = Some(output_format);
}
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
fn parse(&self, val: &str) -> Result<Time, String> {
let mut tz = Tz::UTC;
if let Some(timezone) = &self.timezone {
tz = timezone.parse::<Tz>().map_err(|e| e.to_string())?;
}
for fmt in self.formats.iter() {
if let Ok(ns) = try_parse(val, fmt, tz) {
let mut t = Time::new(val, ns);
t.with_format(fmt);
t.with_timezone(self.timezone.clone());
return Ok(t);
}
}
Err(format!("{} processor: failed to parse {val}", self.kind(),))
}
fn process_field(&self, val: &str, field: &Field) -> Result<Map, String> {
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
Ok(Map::one(key, Value::Time(self.parse(val)?)))
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
type Error = String;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = DateProcessor::default();
let mut formats_opt = None;
for (k, v) in hash {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
FORMATS_NAME => {
let formats = yaml_strings(v, FORMATS_NAME)?;
formats_opt = Some(formats);
}
TIMEZONE_NAME => {
processor.with_timezone(yaml_string(v, TIMEZONE_NAME)?);
}
LOCALE_NAME => {
processor.with_locale(yaml_string(v, LOCALE_NAME)?);
}
OUTPUT_FORMAT_NAME => {
processor.with_output_format(yaml_string(v, OUTPUT_FORMAT_NAME)?);
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
_ => {}
}
}
processor.with_formats(formats_opt);
Ok(processor)
}
}
impl Processor for DateProcessor {
fn kind(&self) -> &str {
PROCESSOR_DATE
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn fields(&self) -> &Fields {
&self.fields
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(s) => self.process_field(s, field),
_ => Err(format!(
"{} processor: expect string value, but got {val:?}",
self.kind()
)),
}
}
}
/// try to parse val with timezone first, if failed, parse without timezone
fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64, String> {
if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?)
} else {
let dt = NaiveDateTime::parse_from_str(val, fmt)
.map_err(|e| e.to_string())?
.and_local_timezone(tz)
.single()
.ok_or("failed to get local timezone")?;
Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?)
}
}
#[cfg(test)]
mod tests {
use chrono_tz::Asia::Tokyo;
use crate::etl::processor::date::{try_parse, DateProcessor};
#[test]
fn test_try_parse() {
let time_with_tz = "2014-5-17T04:34:56+00:00";
let fmt_with_tz = "%Y-%m-%dT%H:%M:%S%:z";
let time_without_tz = "2014-5-17T13:34:56";
let fmt_without_tz = "%Y-%m-%dT%H:%M:%S";
let tz = Tokyo;
let parsed_with_tz = try_parse(time_with_tz, fmt_with_tz, tz);
assert!(parsed_with_tz.is_ok());
let parsed_without_tz = try_parse(time_without_tz, fmt_without_tz, tz);
assert!(parsed_without_tz.is_ok());
assert_eq!(parsed_with_tz.unwrap(), parsed_without_tz.unwrap());
}
#[test]
fn test_parse() {
let mut processor = DateProcessor::default();
processor.with_formats(None);
let values: Vec<&str> = vec![
"2014-5-17T12:34:56",
"2014-5-17T12:34:56Z",
"2014-5-17T12:34:56+09:30",
"2014-5-17T12:34:56.000+09:30",
"2014-5-17T12:34:56-0930",
"2014-5-17T12:34:56.000-0930",
]
.into_iter()
.collect();
for value in values {
let parsed = processor.parse(value);
assert!(parsed.is_ok());
}
}
#[test]
fn test_parse_with_formats() {
let mut processor = DateProcessor::default();
let formats = vec![
"%Y-%m-%dT%H:%M:%S%:z",
"%Y-%m-%dT%H:%M:%S%.3f%:z",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%SZ",
]
.into_iter()
.map(|s| s.to_string())
.collect();
processor.with_formats(Some(formats));
let values: Vec<&str> = vec![
"2014-5-17T12:34:56",
"2014-5-17T12:34:56Z",
"2014-5-17T12:34:56+09:30",
"2014-5-17T12:34:56.000+09:30",
"2014-5-17T12:34:56-0930",
"2014-5-17T12:34:56.000-0930",
]
.into_iter()
.collect();
for value in values {
let parsed = processor.parse(value);
assert!(parsed.is_ok());
}
}
#[test]
fn test_parse_with_timezone() {
let mut processor = DateProcessor::default();
processor.with_formats(None);
processor.with_timezone("Asia/Tokyo".to_string());
let values: Vec<&str> = vec![
"2014-5-17T12:34:56",
"2014-5-17T12:34:56Z",
"2014-5-17T12:34:56+09:30",
"2014-5-17T12:34:56.000+09:30",
"2014-5-17T12:34:56-0930",
"2014-5-17T12:34:56.000-0930",
]
.into_iter()
.collect();
for value in values {
let parsed = processor.parse(value);
assert!(parsed.is_ok());
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,205 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME,
};
use crate::etl::value::time::{
MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
};
use crate::etl::value::{Epoch, Map, Value};
pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
const RESOLUTION_NAME: &str = "resolution";
#[derive(Debug, Default)]
enum Resolution {
Second,
#[default]
Milli,
Micro,
Nano,
}
impl TryFrom<&str> for Resolution {
type Error = String;
fn try_from(s: &str) -> Result<Self, Self::Error> {
match s {
SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
_ => Err(format!("invalid resolution: {s}")),
}
}
}
/// support string, integer, float, time, epoch
#[derive(Debug, Default)]
pub struct EpochProcessor {
fields: Fields,
resolution: Resolution,
ignore_missing: bool,
// description
// if
// ignore_failure
// on_failure
// tag
}
impl EpochProcessor {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields
}
fn with_resolution(&mut self, resolution: Resolution) {
self.resolution = resolution;
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
fn parse(&self, val: &Value) -> Result<Epoch, String> {
let t: i64 = match val {
Value::String(s) => s.parse::<i64>().map_err(|e| e.to_string())?,
Value::Int16(i) => *i as i64,
Value::Int32(i) => *i as i64,
Value::Int64(i) => *i,
Value::Uint8(i) => *i as i64,
Value::Uint16(i) => *i as i64,
Value::Uint32(i) => *i as i64,
Value::Uint64(i) => *i as i64,
Value::Float32(f) => *f as i64,
Value::Float64(f) => *f as i64,
Value::Time(t) => match self.resolution {
Resolution::Second => t.timestamp(),
Resolution::Milli => t.timestamp_millis(),
Resolution::Micro => t.timestamp_micros(),
Resolution::Nano => t.timestamp_nanos(),
},
Value::Epoch(e) => match self.resolution {
Resolution::Second => e.timestamp(),
Resolution::Milli => e.timestamp_millis(),
Resolution::Micro => e.timestamp_micros(),
Resolution::Nano => e.timestamp_nanos(),
},
_ => {
return Err(format!(
"{PROCESSOR_EPOCH} processor: unsupported value {val}"
))
}
};
match self.resolution {
Resolution::Second => Ok(Epoch::Second(t)),
Resolution::Milli => Ok(Epoch::Millisecond(t)),
Resolution::Micro => Ok(Epoch::Microsecond(t)),
Resolution::Nano => Ok(Epoch::Nanosecond(t)),
}
}
fn process_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
Ok(Map::one(key, Value::Epoch(self.parse(val)?)))
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
type Error = String;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = EpochProcessor::default();
for (k, v) in hash {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
RESOLUTION_NAME => {
let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
processor.with_resolution(s);
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
_ => {}
}
}
Ok(processor)
}
}
impl Processor for EpochProcessor {
fn kind(&self) -> &str {
PROCESSOR_EPOCH
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn fields(&self) -> &Fields {
&self.fields
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
self.process_field(val, field)
}
}
#[cfg(test)]
mod tests {
use super::EpochProcessor;
use crate::etl::value::Value;
#[test]
fn test_parse_epoch() {
let mut processor = EpochProcessor::default();
processor.with_resolution(super::Resolution::Second);
let values = [
Value::String("1573840000".into()),
Value::Int32(1573840000),
Value::Uint64(1573840000),
Value::Float32(1573840000.0),
];
for value in values {
let parsed = processor.parse(&value).unwrap();
assert_eq!(parsed, super::Epoch::Second(1573840000));
}
}
}

View File

@@ -0,0 +1,188 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, METHOD_NAME,
};
use crate::etl::value::{Map, Value};
pub(crate) const PROCESSOR_LETTER: &str = "letter";
#[derive(Debug, Default)]
enum Method {
Upper,
#[default]
Lower,
Capital,
}
impl std::fmt::Display for Method {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Method::Upper => write!(f, "upper"),
Method::Lower => write!(f, "lower"),
Method::Capital => write!(f, "capital"),
}
}
}
impl std::str::FromStr for Method {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"upper" => Ok(Method::Upper),
"lower" => Ok(Method::Lower),
"capital" => Ok(Method::Capital),
_ => Err(format!("invalid method: {s}")),
}
}
}
/// only support string value
#[derive(Debug, Default)]
pub struct LetterProcessor {
fields: Fields,
method: Method,
ignore_missing: bool,
}
impl LetterProcessor {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields;
}
fn with_method(&mut self, method: Method) {
self.method = method;
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
fn process_field(&self, val: &str, field: &Field) -> Result<Map, String> {
let processed = match self.method {
Method::Upper => val.to_uppercase(),
Method::Lower => val.to_lowercase(),
Method::Capital => capitalize(val),
};
let val = Value::String(processed);
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
Ok(Map::one(key, val))
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessor {
type Error = String;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = LetterProcessor::default();
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
METHOD_NAME => {
let method = yaml_string(v, METHOD_NAME)?;
processor.with_method(method.parse()?);
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
_ => {}
}
}
Ok(processor)
}
}
impl Processor for LetterProcessor {
fn kind(&self) -> &str {
PROCESSOR_LETTER
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn fields(&self) -> &Fields {
&self.fields
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_field(val, field),
_ => Err(format!(
"{} processor: expect string value, but got {val:?}",
self.kind()
)),
}
}
}
fn capitalize(s: &str) -> String {
let mut c = s.chars();
match c.next() {
None => String::new(),
Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
}
}
#[cfg(test)]
mod tests {
use crate::etl::field::Fields;
use crate::etl::processor::letter::{LetterProcessor, Method};
use crate::etl::value::{Map, Value};
#[test]
fn test_process() {
let field = "letter";
let ff: crate::etl::processor::Field = field.parse().unwrap();
let mut processor = LetterProcessor::default();
processor.with_fields(Fields::one(ff.clone()));
{
processor.with_method(Method::Upper);
let processed = processor.process_field("pipeline", &ff).unwrap();
assert_eq!(Map::one(field, Value::String("PIPELINE".into())), processed)
}
{
processor.with_method(Method::Lower);
let processed = processor.process_field("Pipeline", &ff).unwrap();
assert_eq!(Map::one(field, Value::String("pipeline".into())), processed)
}
{
processor.with_method(Method::Capital);
let processed = processor.process_field("pipeline", &ff).unwrap();
assert_eq!(Map::one(field, Value::String("Pipeline".into())), processed)
}
}
}

View File

@@ -0,0 +1,224 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod cmcd;
pub mod csv;
pub mod date;
pub mod dissect;
pub mod epoch;
pub mod letter;
pub mod regex;
pub mod urlencoding;
use std::sync::Arc;
use cmcd::CMCDProcessor;
use common_telemetry::warn;
use csv::CsvProcessor;
use date::DateProcessor;
use dissect::DissectProcessor;
use epoch::EpochProcessor;
use letter::LetterProcessor;
use regex::RegexProcessor;
use urlencoding::UrlEncodingProcessor;
use crate::etl::field::{Field, Fields};
use crate::etl::value::{Array, Map, Value};
const FIELD_NAME: &str = "field";
const FIELDS_NAME: &str = "fields";
const IGNORE_MISSING_NAME: &str = "ignore_missing";
const METHOD_NAME: &str = "method";
const PATTERNS_NAME: &str = "patterns";
// const IF_NAME: &str = "if";
// const IGNORE_FAILURE_NAME: &str = "ignore_failure";
// const ON_FAILURE_NAME: &str = "on_failure";
// const TAG_NAME: &str = "tag";
pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
fn fields(&self) -> &Fields;
fn kind(&self) -> &str;
fn ignore_missing(&self) -> bool;
fn ignore_processor_array_failure(&self) -> bool {
true
}
/// default behavior does nothing and returns the input value
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
Ok(Map::one(field.get_field(), val.clone()))
}
fn exec_map(&self, mut map: Map) -> Result<Value, String> {
for ff @ Field { field, .. } in self.fields().iter() {
match map.get(field) {
Some(v) => {
map.extend(self.exec_field(v, ff)?);
}
None if self.ignore_missing() => {}
None => {
return Err(format!(
"{} processor: field '{field}' is required but missing in {map}",
self.kind(),
))
}
}
}
Ok(Value::Map(map))
}
fn exec_array(&self, arr: Array) -> Result<Value, String> {
let mut values = vec![];
for val in arr.into_iter() {
match val {
Value::Map(map) => {
values.push(self.exec_map(map)?);
}
_ if self.ignore_processor_array_failure() => {
warn!("expected a map, but got {val}")
}
_ => return Err(format!("expected a map, but got {}", val)),
}
}
Ok(Value::Array(Array { values }))
}
fn exec(&self, val: Value) -> Result<Value, String> {
match val {
Value::Map(map) => self.exec_map(map),
Value::Array(arr) => self.exec_array(arr),
_ => Err(format!("expected a map or array, but got {}", val)),
}
}
}
#[derive(Debug, Default, Clone)]
pub struct Processors {
pub processors: Vec<Arc<dyn Processor>>,
}
impl Processors {
pub fn new() -> Self {
Processors { processors: vec![] }
}
}
impl std::ops::Deref for Processors {
type Target = Vec<Arc<dyn Processor>>;
fn deref(&self) -> &Self::Target {
&self.processors
}
}
impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
type Error = String;
fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self, Self::Error> {
let mut processors = vec![];
for doc in vec {
processors.push(parse_processor(doc)?);
}
Ok(Processors { processors })
}
}
fn parse_processor(doc: &yaml_rust::Yaml) -> Result<Arc<dyn Processor>, String> {
let map = doc.as_hash().ok_or("processor must be a map".to_string())?;
let key = map
.keys()
.next()
.ok_or("processor must have a string key".to_string())?;
let value = map
.get(key)
.unwrap()
.as_hash()
.expect("processor value must be a map");
let str_key = key
.as_str()
.ok_or("processor key must be a string".to_string())?;
let processor: Arc<dyn Processor> = match str_key {
cmcd::PROCESSOR_CMCD => Arc::new(CMCDProcessor::try_from(value)?),
csv::PROCESSOR_CSV => Arc::new(CsvProcessor::try_from(value)?),
date::PROCESSOR_DATE => Arc::new(DateProcessor::try_from(value)?),
dissect::PROCESSOR_DISSECT => Arc::new(DissectProcessor::try_from(value)?),
epoch::PROCESSOR_EPOCH => Arc::new(EpochProcessor::try_from(value)?),
letter::PROCESSOR_LETTER => Arc::new(LetterProcessor::try_from(value)?),
regex::PROCESSOR_REGEX => Arc::new(RegexProcessor::try_from(value)?),
urlencoding::PROCESSOR_URL_ENCODING => Arc::new(UrlEncodingProcessor::try_from(value)?),
_ => return Err(format!("unsupported {} processor", str_key)),
};
Ok(processor)
}
pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String, String> {
v.as_str()
.map(|s| s.trim().to_string())
.ok_or(format!("'{field}' must be a string"))
}
pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>, String> {
let vec = v
.as_vec()
.ok_or(format!("'{field}' must be a list of strings",))?
.iter()
.map(|v| v.as_str().unwrap_or_default().into())
.collect();
Ok(vec)
}
pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool, String> {
v.as_bool().ok_or(format!("'{field}' must be a boolean"))
}
pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T, String>
where
T: std::str::FromStr,
T::Err: ToString,
{
yaml_string(v, field)?
.parse::<T>()
.map_err(|e| e.to_string())
}
pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>, String>
where
T: std::str::FromStr,
T::Err: ToString,
{
yaml_strings(v, field).and_then(|v| {
v.into_iter()
.map(|s| s.parse::<T>().map_err(|e| e.to_string()))
.collect()
})
}
pub(crate) fn yaml_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields, String> {
let v = yaml_parse_strings(v, field)?;
Fields::new(v)
}
pub(crate) fn yaml_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field, String> {
yaml_parse_string(v, field)
}

View File

@@ -0,0 +1,315 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// field_name and prefix with comma separated, like:
// name, new_name
const PATTERNS_NAME: &str = "patterns";
pub(crate) const PROCESSOR_REGEX: &str = "regex";
use lazy_static::lazy_static;
use regex::Regex;
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_strings, Field, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME,
};
use crate::etl::value::{Map, Value};
lazy_static! {
static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap();
}
fn get_regex_group_names(s: &str) -> Vec<String> {
GROUPS_NAME_REGEX
.captures_iter(s)
.filter_map(|c| c.get(1).map(|m| m.as_str().to_string()))
.collect()
}
#[derive(Debug)]
struct GroupRegex {
origin: String,
regex: Regex,
groups: Vec<String>,
}
impl std::fmt::Display for GroupRegex {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let groups = self.groups.join(", ");
write!(f, "{}, groups: [{groups}]", self.origin)
}
}
impl std::str::FromStr for GroupRegex {
type Err = String;
fn from_str(origin: &str) -> Result<Self, Self::Err> {
let groups = get_regex_group_names(origin);
if groups.is_empty() {
return Err(format!("no named group found in regex {origin}"));
}
let regex = Regex::new(origin).map_err(|e| e.to_string())?;
Ok(GroupRegex {
origin: origin.into(),
regex,
groups,
})
}
}
/// only support string value
/// if no value found from a pattern, the target_field will be ignored
#[derive(Debug, Default)]
pub struct RegexProcessor {
fields: Fields,
patterns: Vec<GroupRegex>,
ignore_missing: bool,
}
impl RegexProcessor {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields;
}
fn try_with_patterns(&mut self, patterns: Vec<String>) -> Result<(), String> {
let mut rs = vec![];
for pattern in patterns {
let gr = pattern.parse()?;
rs.push(gr);
}
self.patterns = rs;
Ok(())
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
fn check(self) -> Result<Self, String> {
if self.fields.is_empty() {
return Err(format!(
"no valid field found in {} processor",
PROCESSOR_REGEX
));
}
if self.patterns.is_empty() {
return Err(format!(
"no valid pattern found in {} processor",
PROCESSOR_REGEX
));
}
Ok(self)
}
fn process_field(&self, val: &str, field: &Field, gr: &GroupRegex) -> Result<Map, String> {
let mut map = Map::default();
if let Some(captures) = gr.regex.captures(val) {
for group in &gr.groups {
if let Some(capture) = captures.name(group) {
let value = capture.as_str().to_string();
let prefix = match &field.target_field {
Some(s) => s,
None => &field.field,
};
let key = format!("{prefix}_{group}");
map.insert(key, Value::String(value));
}
}
}
Ok(map)
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
type Error = String;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = RegexProcessor::default();
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
PATTERNS_NAME => {
processor.try_with_patterns(yaml_strings(v, PATTERNS_NAME)?)?;
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
_ => {}
}
}
processor.check()
}
}
impl Processor for RegexProcessor {
fn kind(&self) -> &str {
PROCESSOR_REGEX
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn fields(&self) -> &Fields {
&self.fields
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => {
let mut map = Map::default();
for gr in &self.patterns {
let m = self.process_field(val, field, gr)?;
map.extend(m);
}
Ok(map)
}
_ => Err(format!(
"{} processor: expect string value, but got {val:?}",
self.kind()
)),
}
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use super::RegexProcessor;
use crate::etl::field::Fields;
use crate::etl::processor::Processor;
use crate::etl::value::{Map, Value};
#[test]
fn test_process() {
let mut processor = RegexProcessor::default();
let cc = "[c=c,n=US_CA_SANJOSE,o=55155]";
let cg = "[a=12.34.567.89,b=12345678,c=g,n=US_CA_SANJOSE,o=20940]";
let co = "[a=987.654.321.09,c=o]";
let cp = "[c=p,n=US_CA_SANJOSE,o=55155]";
let cw = "[c=w,n=US_CA_SANJOSE,o=55155]";
let breadcrumbs = Value::String([cc, cg, co, cp, cw].iter().join(","));
let values = [
("breadcrumbs", breadcrumbs.clone()),
("breadcrumbs_parent", Value::String(cc.to_string())),
("breadcrumbs_edge", Value::String(cg.to_string())),
("breadcrumbs_origin", Value::String(co.to_string())),
("breadcrumbs_peer", Value::String(cp.to_string())),
("breadcrumbs_wrapper", Value::String(cw.to_string())),
]
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect();
let temporary_map = Map { values };
{
// single field (with prefix), multiple patterns
let ff = ["breadcrumbs, breadcrumbs"]
.iter()
.map(|f| f.parse().unwrap())
.collect();
processor.with_fields(Fields::new(ff).unwrap());
let ccr = "(?<parent>\\[[^\\[]*c=c[^\\]]*\\])";
let cgr = "(?<edge>\\[[^\\[]*c=g[^\\]]*\\])";
let cor = "(?<origin>\\[[^\\[]*c=o[^\\]]*\\])";
let cpr = "(?<peer>\\[[^\\[]*c=p[^\\]]*\\])";
let cwr = "(?<wrapper>\\[[^\\[]*c=w[^\\]]*\\])";
let patterns = [ccr, cgr, cor, cpr, cwr]
.iter()
.map(|p| p.to_string())
.collect();
processor.try_with_patterns(patterns).unwrap();
let mut map = Map::default();
map.insert("breadcrumbs", breadcrumbs.clone());
let processed_val = processor.exec_map(map).unwrap();
assert_eq!(processed_val, Value::Map(temporary_map.clone()));
}
{
// multiple fields (with prefix), multiple patterns
let ff = [
"breadcrumbs_parent, parent",
"breadcrumbs_edge, edge",
"breadcrumbs_origin, origin",
"breadcrumbs_peer, peer",
"breadcrumbs_wrapper, wrapper",
]
.iter()
.map(|f| f.parse().unwrap())
.collect();
processor.with_fields(Fields::new(ff).unwrap());
let patterns = [
"a=(?<ip>[^,\\]]+)",
"b=(?<request_id>[^,\\]]+)",
"k=(?<request_end_time>[^,\\]]+)",
"l=(?<turn_around_time>[^,\\]]+)",
"m=(?<dns_lookup_time>[^,\\]]+)",
"n=(?<geo>[^,\\]]+)",
"o=(?<asn>[^,\\]]+)",
]
.iter()
.map(|p| p.to_string())
.collect();
processor.try_with_patterns(patterns).unwrap();
let new_values = vec![
("edge_ip", Value::String("12.34.567.89".to_string())),
("edge_request_id", Value::String("12345678".to_string())),
("edge_geo", Value::String("US_CA_SANJOSE".to_string())),
("edge_asn", Value::String("20940".to_string())),
("origin_ip", Value::String("987.654.321.09".to_string())),
("peer_asn", Value::String("55155".to_string())),
("peer_geo", Value::String("US_CA_SANJOSE".to_string())),
("parent_asn", Value::String("55155".to_string())),
("parent_geo", Value::String("US_CA_SANJOSE".to_string())),
("wrapper_asn", Value::String("55155".to_string())),
("wrapper_geo", Value::String("US_CA_SANJOSE".to_string())),
]
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect();
let actual_val = processor.exec_map(temporary_map.clone()).unwrap();
let mut expected_map = temporary_map.clone();
expected_map.extend(Map { values: new_values });
assert_eq!(Value::Map(expected_map), actual_val);
}
}
}

View File

@@ -0,0 +1,177 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use urlencoding::{decode, encode};
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
METHOD_NAME,
};
use crate::etl::value::{Map, Value};
pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding";
#[derive(Debug, Default)]
enum Method {
#[default]
Decode,
Encode,
}
impl std::fmt::Display for Method {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Method::Decode => write!(f, "decode"),
Method::Encode => write!(f, "encode"),
}
}
}
impl std::str::FromStr for Method {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"decode" => Ok(Method::Decode),
"encode" => Ok(Method::Encode),
_ => Err(format!("invalid method: {s}")),
}
}
}
/// only support string value
#[derive(Debug, Default)]
pub struct UrlEncodingProcessor {
fields: Fields,
method: Method,
ignore_missing: bool,
}
impl UrlEncodingProcessor {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields;
}
fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}
fn with_method(&mut self, method: Method) {
self.method = method;
}
fn process_field(&self, val: &str, field: &Field) -> Result<Map, String> {
let processed = match self.method {
Method::Encode => encode(val).to_string(),
Method::Decode => decode(val).map_err(|e| e.to_string())?.into_owned(),
};
let val = Value::String(processed);
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};
Ok(Map::one(key, val))
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessor {
type Error = String;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = UrlEncodingProcessor::default();
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}
METHOD_NAME => {
let method = yaml_string(v, METHOD_NAME)?;
processor.with_method(method.parse()?);
}
_ => {}
}
}
Ok(processor)
}
}
impl crate::etl::processor::Processor for UrlEncodingProcessor {
fn kind(&self) -> &str {
PROCESSOR_URL_ENCODING
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn fields(&self) -> &Fields {
&self.fields
}
fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_field(val, field),
_ => Err(format!(
"{} processor: expect string value, but got {val:?}",
self.kind()
)),
}
}
}
#[cfg(test)]
mod tests {
use crate::etl::field::{Field, Fields};
use crate::etl::processor::urlencoding::UrlEncodingProcessor;
use crate::etl::value::{Map, Value};
#[test]
fn test_decode_url() {
let field = "url";
let ff: Field = field.parse().unwrap();
let decoded = "//BC/[a=6.7.8.9,c=g,k=0,l=1]";
let encoded = "%2F%2FBC%2F%5Ba%3D6.7.8.9%2Cc%3Dg%2Ck%3D0%2Cl%3D1%5D";
let mut processor = UrlEncodingProcessor::default();
processor.with_fields(Fields::one(ff.clone()));
{
let result = processor.process_field(encoded, &ff).unwrap();
assert_eq!(Map::one(field, Value::String(decoded.into())), result)
}
{
processor.with_method(super::Method::Encode);
let result = processor.process_field(decoded, &ff).unwrap();
assert_eq!(Map::one(field, Value::String(encoded.into())), result)
}
}
}

View File

@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
const INDEX_TIMESTAMP: &str = "timestamp";
const INDEX_TAG: &str = "tag";
const INDEX_FULLTEXT: &str = "fulltext";
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Index {
Timestamp,
Tag,
Fulltext,
}
impl std::fmt::Display for Index {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let index = match self {
Index::Timestamp => INDEX_TIMESTAMP,
Index::Tag => INDEX_TAG,
Index::Fulltext => INDEX_FULLTEXT,
};
write!(f, "{}", index)
}
}
impl TryFrom<String> for Index {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
Index::try_from(value.as_str())
}
}
impl TryFrom<&str> for Index {
type Error = String;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
INDEX_TIMESTAMP => Ok(Index::Timestamp),
INDEX_TAG => Ok(Index::Tag),
INDEX_FULLTEXT => Ok(Index::Fulltext),
_ => Err(format!("unsupported index type: {}", value)),
}
}
}

View File

@@ -0,0 +1,205 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod index;
pub mod transformer;
use itertools::Itertools;
use crate::etl::field::Fields;
use crate::etl::processor::{yaml_field, yaml_fields, yaml_string};
use crate::etl::transform::index::Index;
use crate::etl::value::Value;
const TRANSFORM_FIELD: &str = "field";
const TRANSFORM_FIELDS: &str = "fields";
const TRANSFORM_TYPE: &str = "type";
const TRANSFORM_INDEX: &str = "index";
const TRANSFORM_DEFAULT: &str = "default";
pub use transformer::greptime::GreptimeTransformer;
// pub use transformer::noop::NoopTransformer;
pub trait Transformer: std::fmt::Display + Sized + Send + Sync + 'static {
type Output;
fn new(transforms: Transforms) -> Result<Self, String>;
fn transform(&self, val: crate::etl::value::Value) -> Result<Self::Output, String>;
}
#[derive(Debug, Default, Clone)]
pub struct Transforms {
transforms: Vec<Transform>,
}
impl std::fmt::Display for Transforms {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let transforms = self
.transforms
.iter()
.map(|field| field.to_string())
.join(", ");
write!(f, "{}", transforms)
}
}
impl std::ops::Deref for Transforms {
type Target = Vec<Transform>;
fn deref(&self) -> &Self::Target {
&self.transforms
}
}
impl std::ops::DerefMut for Transforms {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.transforms
}
}
impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
type Error = String;
fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self, Self::Error> {
let mut transforms = vec![];
for doc in docs {
let transform: Transform = doc
.as_hash()
.ok_or("transform element must be a map".to_string())?
.try_into()?;
transforms.push(transform);
}
Ok(Transforms { transforms })
}
}
/// only field is required
#[derive(Debug, Clone)]
pub struct Transform {
pub fields: Fields,
pub type_: Value,
pub default: Option<Value>,
pub index: Option<Index>,
}
impl std::fmt::Display for Transform {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let index = if let Some(index) = &self.index {
format!(", index: {}", index)
} else {
"".to_string()
};
let fields = format!("field(s): {}", self.fields);
let type_ = format!("type: {}", self.type_);
write!(f, "{type_}{index}, {fields}")
}
}
impl Default for Transform {
fn default() -> Self {
Transform {
fields: Fields::default(),
type_: Value::Null,
default: None,
index: None,
}
}
}
impl Transform {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields;
}
fn with_type(&mut self, type_: Value) {
self.type_ = type_;
}
fn try_default(&mut self, default: Value) -> Result<(), String> {
match (&self.type_, &default) {
(Value::Null, _) => Err(format!(
"transform {} type MUST BE set before default {}",
self.fields, &default,
)),
(_, Value::Null) => Ok(()), // if default is not set, then it will be regarded as default null
(_, _) => {
let target = self
.type_
.parse_str_value(default.to_str_value().as_str())?;
self.default = Some(target);
Ok(())
}
}
}
fn with_index(&mut self, index: Index) {
self.index = Some(index);
}
pub(crate) fn get_default(&self) -> Option<&Value> {
self.default.as_ref()
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
type Error = String;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut transform = Transform::default();
let mut default_opt = None;
for (k, v) in hash {
let key = k.as_str().ok_or("key must be a string")?;
match key {
TRANSFORM_FIELD => {
transform.with_fields(Fields::one(yaml_field(v, TRANSFORM_FIELD)?));
}
TRANSFORM_FIELDS => {
transform.with_fields(yaml_fields(v, TRANSFORM_FIELDS)?);
}
TRANSFORM_TYPE => {
let t = yaml_string(v, TRANSFORM_TYPE)?;
transform.with_type(Value::parse_str_type(&t)?);
}
TRANSFORM_INDEX => {
let index = yaml_string(v, TRANSFORM_INDEX)?;
transform.with_index(index.try_into()?);
}
TRANSFORM_DEFAULT => {
default_opt = Some(Value::try_from(v)?);
}
_ => {}
}
}
if let Some(default) = default_opt {
transform.try_default(default)?;
}
Ok(transform)
}
}

View File

@@ -0,0 +1,310 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use crate::etl::transform::index::Index;
use crate::etl::transform::Transform;
use crate::etl::value::{Epoch, Time, Value};
impl TryFrom<Value> for ValueData {
type Error = String;
fn try_from(value: Value) -> Result<Self, Self::Error> {
match value {
Value::Null => Err("Null type not supported".to_string()),
Value::Int8(v) => Ok(ValueData::I32Value(v as i32)),
Value::Int16(v) => Ok(ValueData::I32Value(v as i32)),
Value::Int32(v) => Ok(ValueData::I32Value(v)),
Value::Int64(v) => Ok(ValueData::I64Value(v)),
Value::Uint8(v) => Ok(ValueData::U32Value(v as u32)),
Value::Uint16(v) => Ok(ValueData::U32Value(v as u32)),
Value::Uint32(v) => Ok(ValueData::U32Value(v)),
Value::Uint64(v) => Ok(ValueData::U64Value(v)),
Value::Float32(v) => Ok(ValueData::F32Value(v)),
Value::Float64(v) => Ok(ValueData::F64Value(v)),
Value::Boolean(v) => Ok(ValueData::BoolValue(v)),
Value::String(v) => Ok(ValueData::StringValue(v.clone())),
Value::Time(Time { nanosecond, .. }) => Ok(ValueData::TimeNanosecondValue(nanosecond)),
Value::Epoch(Epoch::Nanosecond(ns)) => Ok(ValueData::TimestampNanosecondValue(ns)),
Value::Epoch(Epoch::Microsecond(us)) => Ok(ValueData::TimestampMicrosecondValue(us)),
Value::Epoch(Epoch::Millisecond(ms)) => Ok(ValueData::TimestampMillisecondValue(ms)),
Value::Epoch(Epoch::Second(s)) => Ok(ValueData::TimestampSecondValue(s)),
Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
}
}
}
// TODO(yuanbohan): add fulltext support in datatype_extension
pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>, String> {
let mut columns = Vec::new();
for field in transform.fields.iter() {
let column_name = field.get_target_field().to_string();
let datatype = coerce_type(transform)? as i32;
let semantic_type = coerce_semantic_type(transform) as i32;
let column = ColumnSchema {
column_name,
datatype,
semantic_type,
datatype_extension: None,
};
columns.push(column);
}
Ok(columns)
}
fn coerce_semantic_type(transform: &Transform) -> SemanticType {
match transform.index {
Some(Index::Tag) => SemanticType::Tag,
Some(Index::Timestamp) => SemanticType::Timestamp,
Some(Index::Fulltext) => unimplemented!("Fulltext"),
None => SemanticType::Field,
}
}
fn coerce_type(transform: &Transform) -> Result<ColumnDataType, String> {
match transform.type_ {
Value::Int8(_) => Ok(ColumnDataType::Int8),
Value::Int16(_) => Ok(ColumnDataType::Int16),
Value::Int32(_) => Ok(ColumnDataType::Int32),
Value::Int64(_) => Ok(ColumnDataType::Int64),
Value::Uint8(_) => Ok(ColumnDataType::Uint8),
Value::Uint16(_) => Ok(ColumnDataType::Uint16),
Value::Uint32(_) => Ok(ColumnDataType::Uint32),
Value::Uint64(_) => Ok(ColumnDataType::Uint64),
Value::Float32(_) => Ok(ColumnDataType::Float32),
Value::Float64(_) => Ok(ColumnDataType::Float64),
Value::Boolean(_) => Ok(ColumnDataType::Boolean),
Value::String(_) => Ok(ColumnDataType::String),
Value::Time(_) => Ok(ColumnDataType::TimestampNanosecond),
Value::Epoch(Epoch::Nanosecond(_)) => Ok(ColumnDataType::TimestampNanosecond),
Value::Epoch(Epoch::Microsecond(_)) => Ok(ColumnDataType::TimestampMicrosecond),
Value::Epoch(Epoch::Millisecond(_)) => Ok(ColumnDataType::TimestampMillisecond),
Value::Epoch(Epoch::Second(_)) => Ok(ColumnDataType::TimestampSecond),
Value::Array(_) => unimplemented!("Array"),
Value::Map(_) => unimplemented!("Object"),
Value::Null => Err(format!(
"Null type not supported when to coerce '{}' type",
transform.fields
)),
}
}
pub(crate) fn coerce_value(
val: &Value,
transform: &Transform,
) -> Result<Option<ValueData>, String> {
match val {
Value::Null => Ok(None),
Value::Int8(n) => coerce_i64_value(*n as i64, transform),
Value::Int16(n) => coerce_i64_value(*n as i64, transform),
Value::Int32(n) => coerce_i64_value(*n as i64, transform),
Value::Int64(n) => coerce_i64_value(*n, transform),
Value::Uint8(n) => coerce_u64_value(*n as u64, transform),
Value::Uint16(n) => coerce_u64_value(*n as u64, transform),
Value::Uint32(n) => coerce_u64_value(*n as u64, transform),
Value::Uint64(n) => coerce_u64_value(*n, transform),
Value::Float32(n) => coerce_f64_value(*n as f64, transform),
Value::Float64(n) => coerce_f64_value(*n, transform),
Value::Boolean(b) => coerce_bool_value(*b, transform),
Value::String(s) => coerce_string_value(s, transform),
Value::Time(Time { nanosecond, .. }) => {
Ok(Some(ValueData::TimestampNanosecondValue(*nanosecond)))
}
Value::Epoch(Epoch::Nanosecond(ns)) => Ok(Some(ValueData::TimestampNanosecondValue(*ns))),
Value::Epoch(Epoch::Microsecond(us)) => Ok(Some(ValueData::TimestampMicrosecondValue(*us))),
Value::Epoch(Epoch::Millisecond(ms)) => Ok(Some(ValueData::TimestampMillisecondValue(*ms))),
Value::Epoch(Epoch::Second(s)) => Ok(Some(ValueData::TimestampSecondValue(*s))),
Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
}
}
fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>, String> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(b as i32),
Value::Int16(_) => ValueData::I16Value(b as i32),
Value::Int32(_) => ValueData::I32Value(b as i32),
Value::Int64(_) => ValueData::I64Value(b as i64),
Value::Uint8(_) => ValueData::U8Value(b as u32),
Value::Uint16(_) => ValueData::U16Value(b as u32),
Value::Uint32(_) => ValueData::U32Value(b as u32),
Value::Uint64(_) => ValueData::U64Value(b as u64),
Value::Float32(_) => ValueData::F32Value(if b { 1.0 } else { 0.0 }),
Value::Float64(_) => ValueData::F64Value(if b { 1.0 } else { 0.0 }),
Value::Boolean(_) => ValueData::BoolValue(b),
Value::String(_) => ValueData::StringValue(b.to_string()),
Value::Time(_) => return Err("Boolean type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Boolean type not supported for Epoch".to_string()),
Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Null => return Ok(None),
};
Ok(Some(val))
}
fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>, String> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(n as i32),
Value::Int16(_) => ValueData::I16Value(n as i32),
Value::Int32(_) => ValueData::I32Value(n as i32),
Value::Int64(_) => ValueData::I64Value(n),
Value::Uint8(_) => ValueData::U8Value(n as u32),
Value::Uint16(_) => ValueData::U16Value(n as u32),
Value::Uint32(_) => ValueData::U32Value(n as u32),
Value::Uint64(_) => ValueData::U64Value(n as u64),
Value::Float32(_) => ValueData::F32Value(n as f32),
Value::Float64(_) => ValueData::F64Value(n as f64),
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),
Value::Time(_) => return Err("Integer type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Integer type not supported for Epoch".to_string()),
Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Null => return Ok(None),
};
Ok(Some(val))
}
fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>, String> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(n as i32),
Value::Int16(_) => ValueData::I16Value(n as i32),
Value::Int32(_) => ValueData::I32Value(n as i32),
Value::Int64(_) => ValueData::I64Value(n as i64),
Value::Uint8(_) => ValueData::U8Value(n as u32),
Value::Uint16(_) => ValueData::U16Value(n as u32),
Value::Uint32(_) => ValueData::U32Value(n as u32),
Value::Uint64(_) => ValueData::U64Value(n),
Value::Float32(_) => ValueData::F32Value(n as f32),
Value::Float64(_) => ValueData::F64Value(n as f64),
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),
Value::Time(_) => return Err("Integer type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Integer type not supported for Epoch".to_string()),
Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Null => return Ok(None),
};
Ok(Some(val))
}
fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>, String> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(n as i32),
Value::Int16(_) => ValueData::I16Value(n as i32),
Value::Int32(_) => ValueData::I32Value(n as i32),
Value::Int64(_) => ValueData::I64Value(n as i64),
Value::Uint8(_) => ValueData::U8Value(n as u32),
Value::Uint16(_) => ValueData::U16Value(n as u32),
Value::Uint32(_) => ValueData::U32Value(n as u32),
Value::Uint64(_) => ValueData::U64Value(n as u64),
Value::Float32(_) => ValueData::F32Value(n as f32),
Value::Float64(_) => ValueData::F64Value(n),
Value::Boolean(_) => ValueData::BoolValue(n != 0.0),
Value::String(_) => ValueData::StringValue(n.to_string()),
Value::Time(_) => return Err("Float type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Float type not supported for Epoch".to_string()),
Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Null => return Ok(None),
};
Ok(Some(val))
}
fn coerce_string_value(s: &str, transform: &Transform) -> Result<Option<ValueData>, String> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(s.parse::<i32>().map_err(|e| e.to_string())?),
Value::Int16(_) => ValueData::I16Value(s.parse::<i32>().map_err(|e| e.to_string())?),
Value::Int32(_) => ValueData::I32Value(s.parse::<i32>().map_err(|e| e.to_string())?),
Value::Int64(_) => ValueData::I64Value(s.parse::<i64>().map_err(|e| e.to_string())?),
Value::Uint8(_) => ValueData::U8Value(s.parse::<u32>().map_err(|e| e.to_string())?),
Value::Uint16(_) => ValueData::U16Value(s.parse::<u32>().map_err(|e| e.to_string())?),
Value::Uint32(_) => ValueData::U32Value(s.parse::<u32>().map_err(|e| e.to_string())?),
Value::Uint64(_) => ValueData::U64Value(s.parse::<u64>().map_err(|e| e.to_string())?),
Value::Float32(_) => ValueData::F32Value(s.parse::<f32>().map_err(|e| e.to_string())?),
Value::Float64(_) => ValueData::F64Value(s.parse::<f64>().map_err(|e| e.to_string())?),
Value::Boolean(_) => ValueData::BoolValue(s.parse::<bool>().map_err(|e| e.to_string())?),
Value::String(_) => ValueData::StringValue(s.to_string()),
Value::Time(_) => return Err("String type not supported for Time".to_string()),
Value::Epoch(_) => return Err("String type not supported for Epoch".to_string()),
Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Null => return Ok(None),
};
Ok(Some(val))
}

View File

@@ -0,0 +1,172 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod coerce;
use std::collections::HashSet;
use coerce::{coerce_columns, coerce_value};
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use crate::etl::field::{Field, Fields};
use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transformer, Transforms};
use crate::etl::value::{Array, Epoch, Map, Value};
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
/// fields not in the columns will be discarded
/// to prevent automatic column creation in GreptimeDB
#[derive(Debug, Clone)]
pub struct GreptimeTransformer {
transforms: Transforms,
}
impl GreptimeTransformer {
fn default_greptime_timestamp_column() -> Transform {
let ns = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let type_ = Value::Epoch(Epoch::Nanosecond(ns));
let default = Some(type_.clone());
let field = Field::new(DEFAULT_GREPTIME_TIMESTAMP_COLUMN);
let fields = Fields::new(vec![field]).unwrap();
Transform {
fields,
type_,
default,
index: Some(Index::Timestamp),
}
}
fn schemas(&self) -> Result<Vec<ColumnSchema>, String> {
let mut schema = vec![];
for transform in self.transforms.iter() {
schema.extend(coerce_columns(transform)?);
}
Ok(schema)
}
fn transform_map(&self, map: &Map) -> Result<Row, String> {
let mut values = vec![];
for transform in self.transforms.iter() {
for field in transform.fields.iter() {
let value_data = match map.get(field.get_field()) {
Some(val) => coerce_value(val, transform)?,
None if transform.get_default().is_some() => {
coerce_value(transform.get_default().unwrap(), transform)?
}
None => None,
};
values.push(GreptimeValue { value_data });
}
}
Ok(Row { values })
}
fn transform_array(&self, arr: &Array) -> Result<Vec<Row>, String> {
let mut rows = vec![];
for v in arr.iter() {
match v {
Value::Map(map) => {
let row = self.transform_map(map)?;
rows.push(row);
}
_ => return Err(format!("Expected map, found: {v:?}")),
}
}
Ok(rows)
}
}
impl std::fmt::Display for GreptimeTransformer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(f, "GreptimeTransformer.\nColumns: {}", self.transforms)
}
}
impl Transformer for GreptimeTransformer {
type Output = Rows;
fn new(mut transforms: Transforms) -> Result<Self, String> {
if transforms.is_empty() {
return Err("transform cannot be empty".to_string());
}
let mut column_names_set = HashSet::new();
let mut timestamp_columns = vec![];
for transform in transforms.iter() {
let target_fields_set = transform
.fields
.iter()
.map(|f| f.get_target_field())
.collect::<HashSet<_>>();
let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
if !intersections.is_empty() {
let duplicates = intersections.iter().join(",");
return Err(format!(
"column name must be unique, but got duplicated: {duplicates}"
));
}
column_names_set.extend(target_fields_set);
if let Some(idx) = transform.index {
if idx == Index::Timestamp {
match transform.fields.len() {
1 => timestamp_columns.push(transform.fields.first().unwrap().get_field()),
_ => return Err(format!(
"Illegal to set multiple timestamp Index columns, please set only one: {}",
transform.fields.get_target_fields().join(", ")
)),
}
}
}
}
match timestamp_columns.len() {
0 => {
transforms.push(GreptimeTransformer::default_greptime_timestamp_column());
Ok(GreptimeTransformer { transforms })
}
1 => Ok(GreptimeTransformer { transforms }),
_ => {
let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", ");
let count = timestamp_columns.len();
Err(
format!("transform must have exactly one field specified as timestamp Index, but got {count}: {columns}")
)
}
}
}
fn transform(&self, value: Value) -> Result<Self::Output, String> {
let schema = self.schemas()?;
match value {
Value::Map(map) => {
let rows = vec![self.transform_map(&map)?];
Ok(Rows { schema, rows })
}
Value::Array(arr) => {
let rows = self.transform_array(&arr)?;
Ok(Rows { schema, rows })
}
_ => Err(format!("Expected map or array, found: {}", value)),
}
}
}

View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod greptime;
pub mod noop;

View File

@@ -0,0 +1,36 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::etl::transform::{Transformer, Transforms};
use crate::etl::value::Value;
pub struct NoopTransformer;
impl std::fmt::Display for NoopTransformer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "NoopTransformer")
}
}
impl Transformer for NoopTransformer {
type Output = Value;
fn new(_transforms: Transforms) -> Result<Self, String> {
Ok(NoopTransformer)
}
fn transform(&self, val: Value) -> Result<Self::Output, String> {
Ok(val)
}
}

View File

@@ -0,0 +1,56 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::etl::value::Value;
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Array {
pub values: Vec<Value>,
}
impl Array {
pub fn new() -> Self {
Array { values: vec![] }
}
}
impl std::fmt::Display for Array {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let values = self
.values
.iter()
.map(|v| v.to_string())
.collect::<Vec<String>>()
.join(", ");
write!(f, "[{}]", values)
}
}
impl std::ops::Deref for Array {
type Target = Vec<Value>;
fn deref(&self) -> &Self::Target {
&self.values
}
}
impl IntoIterator for Array {
type Item = Value;
type IntoIter = std::vec::IntoIter<Value>;
fn into_iter(self) -> Self::IntoIter {
self.values.into_iter()
}
}

View File

@@ -0,0 +1,64 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use crate::etl::value::Value;
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Map {
pub values: HashMap<String, Value>,
}
impl Map {
pub fn one(key: impl Into<String>, value: Value) -> Map {
let mut map = Map::default();
map.insert(key, value);
map
}
pub fn insert(&mut self, key: impl Into<String>, value: Value) {
self.values.insert(key.into(), value);
}
pub fn extend(&mut self, Map { values }: Map) {
self.values.extend(values);
}
}
impl From<HashMap<String, Value>> for Map {
fn from(values: HashMap<String, Value>) -> Self {
Map { values }
}
}
impl std::ops::Deref for Map {
type Target = HashMap<String, Value>;
fn deref(&self) -> &Self::Target {
&self.values
}
}
impl std::fmt::Display for Map {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let values = self
.values
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<String>>()
.join(", ");
write!(f, "{{{}}}", values)
}
}

View File

@@ -0,0 +1,303 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod array;
pub mod map;
pub mod time;
pub use std::collections::HashMap;
pub use array::Array;
pub use map::Map;
pub use time::{Epoch, Time};
/// Value can be used as type
/// acts as value: the enclosed value is the actual value
/// acts as type: the enclosed value is the default value
#[derive(Debug, Clone, PartialEq)]
pub enum Value {
// as value: null
// as type: no type specified
Null,
Int8(i8),
Int16(i16),
Int32(i32),
Int64(i64),
Uint8(u8),
Uint16(u16),
Uint32(u32),
Uint64(u64),
Float32(f32),
Float64(f64),
Boolean(bool),
String(String),
Time(Time),
Epoch(Epoch),
Array(Array),
Map(Map),
}
impl Value {
pub fn is_null(&self) -> bool {
matches!(self, Value::Null)
}
pub fn parse_str_type(t: &str) -> Result<Self, String> {
let mut parts = t.splitn(2, ',');
let head = parts.next().unwrap_or_default();
let tail = parts.next().map(|s| s.trim().to_string());
match head.to_lowercase().as_str() {
"int8" => Ok(Value::Int8(0)),
"int16" => Ok(Value::Int16(0)),
"int32" => Ok(Value::Int32(0)),
"int64" => Ok(Value::Int64(0)),
"uint8" => Ok(Value::Uint8(0)),
"uint16" => Ok(Value::Uint16(0)),
"uint32" => Ok(Value::Uint32(0)),
"uint64" => Ok(Value::Uint64(0)),
"float32" => Ok(Value::Float32(0.0)),
"float64" => Ok(Value::Float64(0.0)),
"boolean" => Ok(Value::Boolean(false)),
"string" => Ok(Value::String("".to_string())),
"time" => Ok(Value::Time(Time::default())),
"epoch" => match tail {
Some(resolution) if !resolution.is_empty() => match resolution.as_str() {
time::NANOSECOND_RESOLUTION | time::NANO_RESOLUTION | time::NS_RESOLUTION => {
Ok(Value::Epoch(Epoch::Nanosecond(0)))
}
time::MICROSECOND_RESOLUTION | time::MICRO_RESOLUTION | time::US_RESOLUTION => {
Ok(Value::Epoch(Epoch::Microsecond(0)))
}
time::MILLISECOND_RESOLUTION | time::MILLI_RESOLUTION | time::MS_RESOLUTION => {
Ok(Value::Epoch(Epoch::Millisecond(0)))
}
time::SECOND_RESOLUTION | time::SEC_RESOLUTION | time::S_RESOLUTION => {
Ok(Value::Epoch(Epoch::Second(0)))
}
_ => Err(format!(
"invalid resolution: '{resolution}'. Available resolutions: {}",
time::VALID_RESOLUTIONS.join(",")
)),
},
_ => Err(format!(
"resolution MUST BE set for epoch type: '{t}'. Available resolutions: {}",
time::VALID_RESOLUTIONS.join(", ")
)),
},
"array" => Ok(Value::Array(Array::default())),
"map" => Ok(Value::Map(Map::default())),
_ => Err(format!("failed to parse type: '{t}'")),
}
}
/// only support string, bool, number, null
pub fn parse_str_value(&self, v: &str) -> Result<Self, String> {
match self {
Value::Int8(_) => v
.parse::<i8>()
.map(Value::Int8)
.map_err(|e| format!("failed to parse int8: {}", e)),
Value::Int16(_) => v
.parse::<i16>()
.map(Value::Int16)
.map_err(|e| format!("failed to parse int16: {}", e)),
Value::Int32(_) => v
.parse::<i32>()
.map(Value::Int32)
.map_err(|e| format!("failed to parse int32: {}", e)),
Value::Int64(_) => v
.parse::<i64>()
.map(Value::Int64)
.map_err(|e| format!("failed to parse int64: {}", e)),
Value::Uint8(_) => v
.parse::<u8>()
.map(Value::Uint8)
.map_err(|e| format!("failed to parse uint8: {}", e)),
Value::Uint16(_) => v
.parse::<u16>()
.map(Value::Uint16)
.map_err(|e| format!("failed to parse uint16: {}", e)),
Value::Uint32(_) => v
.parse::<u32>()
.map(Value::Uint32)
.map_err(|e| format!("failed to parse uint32: {}", e)),
Value::Uint64(_) => v
.parse::<u64>()
.map(Value::Uint64)
.map_err(|e| format!("failed to parse uint64: {}", e)),
Value::Float32(_) => v
.parse::<f32>()
.map(Value::Float32)
.map_err(|e| format!("failed to parse float32: {}", e)),
Value::Float64(_) => v
.parse::<f64>()
.map(Value::Float64)
.map_err(|e| format!("failed to parse float64: {}", e)),
Value::Boolean(_) => v
.parse::<bool>()
.map(Value::Boolean)
.map_err(|e| format!("failed to parse bool: {}", e)),
Value::String(_) => Ok(Value::String(v.to_string())),
Value::Null => Ok(Value::Null),
_ => Err(format!("default value not unsupported for type {}", self)),
}
}
/// only support string, bool, number, null
pub fn to_str_value(&self) -> String {
match self {
Value::Int8(v) => format!("{}", v),
Value::Int16(v) => format!("{}", v),
Value::Int32(v) => format!("{}", v),
Value::Int64(v) => format!("{}", v),
Value::Uint8(v) => format!("{}", v),
Value::Uint16(v) => format!("{}", v),
Value::Uint32(v) => format!("{}", v),
Value::Uint64(v) => format!("{}", v),
Value::Float32(v) => format!("{}", v),
Value::Float64(v) => format!("{}", v),
Value::Boolean(v) => format!("{}", v),
Value::String(v) => v.to_string(),
v => v.to_string(),
}
}
}
impl std::fmt::Display for Value {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let str = match self {
Value::Null => "null".to_string(),
Value::Int8(v) => format!("int8({})", v),
Value::Int16(v) => format!("int16({})", v),
Value::Int32(v) => format!("int32({})", v),
Value::Int64(v) => format!("int64({})", v),
Value::Uint8(v) => format!("uint8({})", v),
Value::Uint16(v) => format!("uint16({})", v),
Value::Uint32(v) => format!("uint32({})", v),
Value::Uint64(v) => format!("uint64({})", v),
Value::Float32(v) => format!("float32({})", v),
Value::Float64(v) => format!("float64({})", v),
Value::Boolean(v) => format!("boolean({})", v),
Value::String(v) => format!("string({})", v),
Value::Time(v) => format!("time({})", v),
Value::Epoch(v) => format!("epoch({})", v),
Value::Array(v) => format!("{}", v),
Value::Map(v) => format!("{}", v),
};
write!(f, "{}", str)
}
}
impl TryFrom<serde_json::Value> for Value {
type Error = String;
fn try_from(v: serde_json::Value) -> Result<Self, Self::Error> {
match v {
serde_json::Value::Null => Ok(Value::Null),
serde_json::Value::Bool(v) => Ok(Value::Boolean(v)),
serde_json::Value::Number(v) => {
if let Some(v) = v.as_i64() {
Ok(Value::Int64(v))
} else if let Some(v) = v.as_u64() {
Ok(Value::Uint64(v))
} else if let Some(v) = v.as_f64() {
Ok(Value::Float64(v))
} else {
Err(format!("unsupported number type: {}", v))
}
}
serde_json::Value::String(v) => Ok(Value::String(v)),
serde_json::Value::Array(v) => {
let mut values = vec![];
for v in v {
values.push(Value::try_from(v)?);
}
Ok(Value::Array(Array { values }))
}
serde_json::Value::Object(v) => {
let mut values = HashMap::new();
for (k, v) in v {
values.insert(k, Value::try_from(v)?);
}
Ok(Value::Map(Map { values }))
}
}
}
}
impl TryFrom<&yaml_rust::Yaml> for Value {
type Error = String;
fn try_from(v: &yaml_rust::Yaml) -> Result<Self, Self::Error> {
match v {
yaml_rust::Yaml::Null => Ok(Value::Null),
yaml_rust::Yaml::Boolean(v) => Ok(Value::Boolean(*v)),
yaml_rust::Yaml::Integer(v) => Ok(Value::Int64(*v)),
yaml_rust::Yaml::Real(v) => {
if let Ok(v) = v.parse() {
Ok(Value::Float64(v))
} else {
Err(format!("failed to parse float64: {}", v))
}
}
yaml_rust::Yaml::String(v) => Ok(Value::String(v.to_string())),
yaml_rust::Yaml::Array(arr) => {
let mut values = vec![];
for v in arr {
values.push(Value::try_from(v)?);
}
Ok(Value::Array(Array { values }))
}
yaml_rust::Yaml::Hash(v) => {
let mut values = HashMap::new();
for (k, v) in v {
let key = k
.as_str()
.ok_or(format!("key in Hash must be a string, but got {v:?}"))?;
values.insert(key.to_string(), Value::try_from(v)?);
}
Ok(Value::Map(Map { values }))
}
_ => Err(format!("unsupported yaml type: {v:?}")),
}
}
}

View File

@@ -0,0 +1,187 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::error;
#[derive(Debug, Clone, PartialEq)]
pub struct Time {
pub value: String,
pub nanosecond: i64,
pub format: Option<String>,
pub timezone: Option<String>,
// TODO(yuanbohan): support locale
// pub locale: Option<String>,
}
impl Time {
pub(crate) fn new(v: impl Into<String>, nanosecond: i64) -> Self {
let value = v.into();
Time {
value,
nanosecond,
format: None,
timezone: None,
}
}
pub(crate) fn with_format(&mut self, format: impl Into<String>) {
self.format = Some(format.into());
}
pub(crate) fn with_timezone(&mut self, timezone: Option<String>) {
self.timezone = timezone;
}
pub(crate) fn timestamp_nanos(&self) -> i64 {
self.nanosecond
}
pub(crate) fn timestamp_micros(&self) -> i64 {
self.nanosecond / 1_000
}
pub(crate) fn timestamp_millis(&self) -> i64 {
self.nanosecond / 1_000_000
}
pub(crate) fn timestamp(&self) -> i64 {
self.nanosecond / 1_000_000_000
}
}
impl Default for Time {
fn default() -> Self {
let dt = chrono::Utc::now();
let v = dt.to_rfc3339();
let ns = match dt.timestamp_nanos_opt() {
Some(ns) => ns,
None => {
error!("failed to get nanosecond from timestamp, use 0 instead");
0
}
};
Time::new(v, ns)
}
}
impl std::fmt::Display for Time {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let format = if let Some(format) = &self.format {
format!(", format: {}", format)
} else {
"".to_string()
};
let timezone = if let Some(timezone) = &self.timezone {
format!(", timezone: {}", timezone)
} else {
"".to_string()
};
write!(f, "{}, format: {}{}", self.value, format, timezone)
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Epoch {
Nanosecond(i64),
Microsecond(i64),
Millisecond(i64),
Second(i64),
}
pub(crate) const NANOSECOND_RESOLUTION: &str = "nanosecond";
pub(crate) const NANO_RESOLUTION: &str = "nano";
pub(crate) const NS_RESOLUTION: &str = "ns";
pub(crate) const MICROSECOND_RESOLUTION: &str = "microsecond";
pub(crate) const MICRO_RESOLUTION: &str = "micro";
pub(crate) const US_RESOLUTION: &str = "us";
pub(crate) const MILLISECOND_RESOLUTION: &str = "millisecond";
pub(crate) const MILLI_RESOLUTION: &str = "milli";
pub(crate) const MS_RESOLUTION: &str = "ms";
pub(crate) const SECOND_RESOLUTION: &str = "second";
pub(crate) const SEC_RESOLUTION: &str = "sec";
pub(crate) const S_RESOLUTION: &str = "s";
pub(crate) const VALID_RESOLUTIONS: [&str; 12] = [
NANOSECOND_RESOLUTION,
NANO_RESOLUTION,
NS_RESOLUTION,
MICROSECOND_RESOLUTION,
MICRO_RESOLUTION,
US_RESOLUTION,
MILLISECOND_RESOLUTION,
MILLI_RESOLUTION,
MS_RESOLUTION,
SECOND_RESOLUTION,
SEC_RESOLUTION,
S_RESOLUTION,
];
impl Epoch {
pub(crate) fn timestamp_nanos(&self) -> i64 {
match self {
Epoch::Nanosecond(v) => *v,
Epoch::Microsecond(v) => *v * 1_000,
Epoch::Millisecond(v) => *v * 1_000_000,
Epoch::Second(v) => *v * 1_000_000_000,
}
}
pub(crate) fn timestamp_micros(&self) -> i64 {
match self {
Epoch::Nanosecond(v) => *v / 1_000,
Epoch::Microsecond(v) => *v,
Epoch::Millisecond(v) => *v * 1_000,
Epoch::Second(v) => *v * 1_000_000,
}
}
pub(crate) fn timestamp_millis(&self) -> i64 {
match self {
Epoch::Nanosecond(v) => *v / 1_000_000,
Epoch::Microsecond(v) => *v / 1_000,
Epoch::Millisecond(v) => *v,
Epoch::Second(v) => *v * 1_000,
}
}
pub(crate) fn timestamp(&self) -> i64 {
match self {
Epoch::Nanosecond(v) => *v / 1_000_000_000,
Epoch::Microsecond(v) => *v / 1_000_000,
Epoch::Millisecond(v) => *v / 1_000,
Epoch::Second(v) => *v,
}
}
}
impl Default for Epoch {
fn default() -> Self {
Epoch::Nanosecond(chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0))
}
}
impl std::fmt::Display for Epoch {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let (value, resolution) = match self {
Epoch::Nanosecond(v) => (v, NANOSECOND_RESOLUTION),
Epoch::Microsecond(v) => (v, MICROSECOND_RESOLUTION),
Epoch::Millisecond(v) => (v, MILLISECOND_RESOLUTION),
Epoch::Second(v) => (v, SECOND_RESOLUTION),
};
write!(f, "{}, resolution: {}", value, resolution)
}
}

View File

@@ -0,0 +1,461 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::tracing::info;
use greptime_proto::v1::value::ValueData::{
BoolValue, F64Value, StringValue, TimestampSecondValue, U32Value, U64Value, U8Value,
};
use greptime_proto::v1::Value as GreptimeValue;
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};
// use pipeline::transform::GreptimeTransformer;
// use pipeline::value::Value;
// use pipeline::{parse, Content, Pipeline};
#[test]
fn main() {
let input_value_str = r#"
[
{
"version": 1,
"streamId": "12345",
"cp": "123456",
"reqId": "1239f220",
"reqTimeSec": "1573840000",
"bytes": "4995",
"cliIP": "128.147.28.68",
"statusCode": "206",
"proto": "HTTPS",
"reqHost": "test.hostname.net",
"reqMethod": "GET",
"reqPath": "/path1/path2/file.ext",
"reqPort": "443",
"rspContentLen": "5000",
"rspContentType": "text/html",
"UA": "Mozilla%2F5.0+%28Macintosh%3B+Intel+Mac+OS+X+10_14_3%29",
"tlsOverheadTimeMSec": "0",
"tlsVersion": "TLSv1",
"objSize": "484",
"uncompressedSize": "484",
"overheadBytes": "232",
"totalBytes": "0",
"queryStr": "cmcd=//1.0@V/bl=21600,br=1426,cid=%22akam-email%22,d=6006,mtp=11100,ot=m,sf=h,sid=%229f36f5c9-d6a2-497b-8c73-4b8f694eab749f36f5c9-d6a2-497b-8c73%22,tb=1426,dl=18500,nor=%22../300kbps/track.m4v%22,nrr=%2212323-48763%22,su,bs,rtp=12000,pr=1.08,sf=d,st=v%22",
"breadcrumbs": "//BC/%5Ba=23.33.41.20,c=g,k=0,l=1%5D",
"accLang": "en-US",
"cookie": "cookie-content",
"range": "37334-42356",
"referer": "https%3A%2F%2Ftest.referrer.net%2Fen-US%2Fdocs%2FWeb%2Ftest",
"xForwardedFor": "8.47.28.38",
"maxAgeSec": "3600",
"reqEndTimeMSec": "3",
"errorCode": "ERR_ACCESS_DENIED|fwd_acl",
"turnAroundTimeMSec": "11",
"transferTimeMSec": "125",
"dnsLookupTimeMSec": "50",
"lastByte": "1",
"edgeIP": "23.50.51.173",
"country": "IN",
"state": "Virginia",
"city": "HERNDON",
"serverCountry": "SG",
"billingRegion": "8",
"cacheStatus": "1",
"securityRules": "ULnR_28976|3900000:3900001:3900005:3900006:BOT-ANOMALY-HEADER|",
"ewUsageInfo": "//4380/4.0/1/-/0/4/#1,2\\//4380/4.0/4/-/0/4/#0,0\\//4380/4.0/5/-/1/1/#0,0",
"ewExecutionInfo": "c:4380:7:161:162:161:n:::12473:200|C:4380:3:0:4:0:n:::6967:200|R:4380:20:99:99:1:n:::35982:200",
"customField": "any-custom-value"
}
]
"#;
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_value_str)
.expect("failed to parse input value")
.try_into()
.expect("failed to convert input value");
let pipeline_yaml = r#"
---
description: Pipeline for Akamai DataStream2 Log
processors:
- urlencoding:
fields:
- breadcrumbs
- UA
- referer
- queryStr
method: decode
ignore_missing: true
- epoch:
field: reqTimeSec
resolution: second
ignore_missing: true
- regex:
field: breadcrumbs
patterns:
- "(?<parent>\\[[^\\[]*c=c[^\\]]*\\])"
- "(?<edge>\\[[^\\[]*c=g[^\\]]*\\])"
- "(?<origin>\\[[^\\[]*c=o[^\\]]*\\])"
- "(?<peer>\\[[^\\[]*c=p[^\\]]*\\])"
- "(?<cloud_wrapper>\\[[^\\[]*c=w[^\\]]*\\])"
ignore_missing: true
- regex:
fields:
- breadcrumbs_parent
- breadcrumbs_edge
- breadcrumbs_origin
- breadcrumbs_peer
- breadcrumbs_cloud_wrapper
ignore_missing: true
patterns:
- "a=(?<ip>[^,\\]]+)"
- "b=(?<request_id>[^,\\]]+)"
- "k=(?<request_end_time>[^,\\]]+)"
- "l=(?<turn_around_time>[^,\\]]+)"
- "m=(?<dns_lookup_time>[^,\\]]+)"
- "n=(?<geo>[^,\\]]+)"
- "o=(?<asn>[^,\\]]+)"
- regex:
field: queryStr, cmcd
patterns:
- "(?i)CMCD=//(?<version>[\\d\\.]+)@V/(?<data>.+$)"
ignore_missing: true
- cmcd:
field: cmcd_data, cmcd
ignore_missing: true
transform:
- fields:
- breadcrumbs
- referer
- queryStr, query_str
- customField, custom_field
- reqId, req_id
- city
- state
- country
- securityRules, security_rules
- ewUsageInfo, ew_usage_info
- ewExecutionInfo, ew_execution_info
- errorCode, error_code
- xForwardedFor, x_forwarded_for
- range
- accLang, acc_lang
- reqMethod, req_method
- reqHost, req_host
- proto
- cliIP, cli_ip
- rspContentType, rsp_content_type
- tlsVersion, tls_version
type: string
- fields:
- version
- cacheStatus, cache_status
- lastByte, last_byte
type: uint8
- fields:
- streamId, stream_id
- billingRegion, billing_region
- dnsLookupTimeMSec, dns_lookup_time_msec
- transferTimeMSec, transfer_time_msec
- turnAroundTimeMSec, turn_around_time_msec
- reqEndTimeMSec, req_end_time_msec
- maxAgeSec, max_age_sec
- reqPort, req_port
- statusCode, status_code
- cp
- tlsOverheadTimeMSec, tls_overhead_time_msec
type: uint32
- fields:
- bytes
- rspContentLen, rsp_content_len
- objSize, obj_size
- uncompressedSize, uncompressed_size
- overheadBytes, overhead_bytes
- totalBytes, total_bytes
type: uint64
- fields:
- UA, user_agent
- cookie
- reqPath, req_path
type: string
# index: fulltext
- field: reqTimeSec, req_time_sec
# epoch time is special, the resolution MUST BE specified
type: epoch, second
index: timestamp
# the following is from cmcd
- fields:
- cmcd_version
- cmcd_cid, cmcd_content_id
- cmcd_nor, cmcd_next_object_requests
- cmcd_nrr, cmcd_next_range_request
- cmcd_ot, cmcd_object_type
- cmcd_sf, cmcd_streaming_format
- cmcd_sid, cmcd_session_id
- cmcd_st, cmcd_stream_type
- cmcd_v
type: string
- fields:
- cmcd_br, cmcd_encoded_bitrate
- cmcd_bl, cmcd_buffer_length
- cmcd_d, cmcd_object_duration
- cmcd_dl, cmcd_deadline
- cmcd_mtp, cmcd_measured_throughput
- cmcd_rtp, cmcd_requested_max_throughput
- cmcd_tb, cmcd_top_bitrate
type: uint64
- fields:
- cmcd_pr, cmcd_playback_rate
type: float64
- fields:
- cmcd_bs, cmcd_buffer_starvation
- cmcd_su, cmcd_startup
type: boolean
# the following is from breadcrumbs
- fields:
- breadcrumbs_parent_ip
- breadcrumbs_parent_request_id
- breadcrumbs_parent_geo
- breadcrumbs_edge_ip
- breadcrumbs_edge_request_id
- breadcrumbs_edge_geo
- breadcrumbs_origin_ip
- breadcrumbs_origin_request_id
- breadcrumbs_origin_geo
- breadcrumbs_peer_ip
- breadcrumbs_peer_request_id
- breadcrumbs_peer_geo
- breadcrumbs_cloud_wrapper_ip
- breadcrumbs_cloud_wrapper_request_id
- breadcrumbs_cloud_wrapper_geo
type: string
- fields:
- breadcrumbs_parent_request_end_time
- breadcrumbs_parent_turn_around_time
- breadcrumbs_parent_dns_lookup_time
- breadcrumbs_parent_asn
- breadcrumbs_edge_request_end_time
- breadcrumbs_edge_turn_around_time
- breadcrumbs_edge_dns_lookup_time
- breadcrumbs_edge_asn
- breadcrumbs_origin_request_end_time
- breadcrumbs_origin_turn_around_time
- breadcrumbs_origin_dns_lookup_time
- breadcrumbs_origin_asn
- breadcrumbs_peer_request_end_time
- breadcrumbs_peer_turn_around_time
- breadcrumbs_peer_dns_lookup_time
- breadcrumbs_peer_asn
- breadcrumbs_cloud_wrapper_request_end_time
- breadcrumbs_cloud_wrapper_turn_around_time
- breadcrumbs_cloud_wrapper_dns_lookup_time
- breadcrumbs_cloud_wrapper_asn
type: uint32
"#;
let expected_values = vec![
(
"breadcrumbs",
Some(StringValue("//BC/[a=23.33.41.20,c=g,k=0,l=1]".into())),
),
(
"referer",
Some(StringValue(
"https://test.referrer.net/en-US/docs/Web/test".into(),
)),
),
(
"query_str",
Some(StringValue("cmcd=//1.0@V/bl=21600,br=1426,cid=\"akam-email\",d=6006,mtp=11100,ot=m,sf=h,sid=\"9f36f5c9-d6a2-497b-8c73-4b8f694eab749f36f5c9-d6a2-497b-8c73\",tb=1426,dl=18500,nor=\"../300kbps/track.m4v\",nrr=\"12323-48763\",su,bs,rtp=12000,pr=1.08,sf=d,st=v\"".into())),
),
("custom_field", Some(StringValue("any-custom-value".into()))),
("req_id", Some(StringValue("1239f220".into()))),
("city", Some(StringValue("HERNDON".into()))),
("state", Some(StringValue("Virginia".into()))),
("country", Some(StringValue("IN".into()))),
(
"security_rules",
Some(StringValue(
"ULnR_28976|3900000:3900001:3900005:3900006:BOT-ANOMALY-HEADER|".into(),
)),
),
(
"ew_usage_info",
Some(StringValue(
"//4380/4.0/1/-/0/4/#1,2\\//4380/4.0/4/-/0/4/#0,0\\//4380/4.0/5/-/1/1/#0,0".into(),
)),
),
(
"ew_execution_info",
Some(StringValue("c:4380:7:161:162:161:n:::12473:200|C:4380:3:0:4:0:n:::6967:200|R:4380:20:99:99:1:n:::35982:200".into()))),
(
"error_code",
Some(StringValue("ERR_ACCESS_DENIED|fwd_acl".into())),
),
("x_forwarded_for", Some(StringValue("8.47.28.38".into()))),
("range", Some(StringValue("37334-42356".into()))),
("acc_lang", Some(StringValue("en-US".into()))),
("req_method", Some(StringValue("GET".into()))),
("req_host", Some(StringValue("test.hostname.net".into()))),
("proto", Some(StringValue("HTTPS".into()))),
("cli_ip", Some(StringValue("128.147.28.68".into()))),
("rsp_content_type", Some(StringValue("text/html".into()))),
("tls_version", Some(StringValue("TLSv1".into()))),
("version", Some(U8Value(1))),
("cache_status", Some(U8Value(1))),
("last_byte", Some(U8Value(1))),
("stream_id", Some(U32Value(12345))),
("billing_region", Some(U32Value(8))),
("dns_lookup_time_msec", Some(U32Value(50))),
("transfer_time_msec", Some(U32Value(125))),
("turn_around_time_msec", Some(U32Value(11))),
("req_end_time_msec", Some(U32Value(3))),
("max_age_sec", Some(U32Value(3600))),
("req_port", Some(U32Value(443))),
("status_code", Some(U32Value(206))),
("cp", Some(U32Value(123456))),
("tls_overhead_time_msec", Some(U32Value(0))),
("bytes", Some(U64Value(4995))),
("rsp_content_len", Some(U64Value(5000))),
("obj_size", Some(U64Value(484))),
("uncompressed_size", Some(U64Value(484))),
("overhead_bytes", Some(U64Value(232))),
("total_bytes", Some(U64Value(0))),
(
"user_agent",
Some(StringValue(
"Mozilla/5.0+(Macintosh;+Intel+Mac+OS+X+10_14_3)".into(),
)),
),
("cookie", Some(StringValue("cookie-content".into()))),
(
"req_path",
Some(StringValue("/path1/path2/file.ext".into())),
),
("req_time_sec", Some(TimestampSecondValue(1573840000))),
("cmcd_version", Some(StringValue("1.0".into()))),
(
"cmcd_content_id",
Some(StringValue("\"akam-email\"".into())),
),
(
"cmcd_next_object_requests",
Some(StringValue("\"../300kbps/track.m4v\"".into())),
),
(
"cmcd_next_range_request",
Some(StringValue("\"12323-48763\"".into())),
),
("cmcd_object_type", Some(StringValue("m".into()))),
("cmcd_streaming_format", Some(StringValue("d".into()))),
(
"cmcd_session_id",
Some(StringValue(
"\"9f36f5c9-d6a2-497b-8c73-4b8f694eab749f36f5c9-d6a2-497b-8c73\"".into(),
)),
),
("cmcd_stream_type", Some(StringValue("v\"".into()))),
("cmcd_v", None),
("cmcd_encoded_bitrate", Some(U64Value(1426))),
("cmcd_buffer_length", Some(U64Value(21600))),
("cmcd_object_duration", Some(U64Value(6006))),
("cmcd_deadline", Some(U64Value(18500))),
("cmcd_measured_throughput", Some(U64Value(11100))),
("cmcd_requested_max_throughput", Some(U64Value(12000))),
("cmcd_top_bitrate", Some(U64Value(1426))),
("cmcd_playback_rate", Some(F64Value(1.08))),
("cmcd_buffer_starvation", Some(BoolValue(true))),
("cmcd_startup", Some(BoolValue(true))),
("breadcrumbs_parent_ip", None),
("breadcrumbs_parent_request_id", None),
("breadcrumbs_parent_geo", None),
(
"breadcrumbs_edge_ip",
Some(StringValue("23.33.41.20".into())),
),
("breadcrumbs_edge_request_id", None),
("breadcrumbs_edge_geo", None),
("breadcrumbs_origin_ip", None),
("breadcrumbs_origin_request_id", None),
("breadcrumbs_origin_geo", None),
("breadcrumbs_peer_ip", None),
("breadcrumbs_peer_request_id", None),
("breadcrumbs_peer_geo", None),
("breadcrumbs_cloud_wrapper_ip", None),
("breadcrumbs_cloud_wrapper_request_id", None),
("breadcrumbs_cloud_wrapper_geo", None),
("breadcrumbs_parent_request_end_time", None),
("breadcrumbs_parent_turn_around_time", None),
("breadcrumbs_parent_dns_lookup_time", None),
("breadcrumbs_parent_asn", None),
("breadcrumbs_edge_request_end_time", Some(U32Value(0))),
("breadcrumbs_edge_turn_around_time", Some(U32Value(1))),
("breadcrumbs_edge_dns_lookup_time", None),
("breadcrumbs_edge_asn", None),
("breadcrumbs_origin_request_end_time", None),
("breadcrumbs_origin_turn_around_time", None),
("breadcrumbs_origin_dns_lookup_time", None),
("breadcrumbs_origin_asn", None),
("breadcrumbs_peer_request_end_time", None),
("breadcrumbs_peer_turn_around_time", None),
("breadcrumbs_peer_dns_lookup_time", None),
("breadcrumbs_peer_asn", None),
("breadcrumbs_cloud_wrapper_request_end_time", None),
("breadcrumbs_cloud_wrapper_turn_around_time", None),
("breadcrumbs_cloud_wrapper_dns_lookup_time", None),
("breadcrumbs_cloud_wrapper_asn", None),
]
.into_iter()
.map(|(_, d)| GreptimeValue { value_data: d })
.collect::<Vec<GreptimeValue>>();
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let output = pipeline.exec(input_value).expect("failed to exec pipeline");
assert_eq!(output.rows.len(), 1);
let values = output.rows.first().unwrap().values.clone();
assert_eq!(expected_values, values);
for s in output.schema.iter() {
info!(
"{}({}): {}",
s.column_name,
s.datatype().as_str_name(),
s.semantic_type().as_str_name()
);
}
info!("\n");
let get_schema_name = |ss: &Vec<greptime_proto::v1::ColumnSchema>, i: usize| {
let s = ss.get(i).unwrap();
s.column_name.clone()
};
for row in output.rows.iter() {
let values = &row.values;
for i in 0..values.len() {
let val = values.get(i).unwrap();
info!(
"{}: {:?}, ",
get_schema_name(&output.schema, i),
val.value_data
);
}
info!("\n");
}
}

View File

@@ -2,6 +2,7 @@
Pn = "Pn"
ue = "ue"
worl = "worl"
ot = "ot"
[files]
extend-exclude = [