refactor: Change the error type in the pipeline crate from String to Error (#4763)

* chore: in process

* chore: change pipeline crate error type

* chore: improve event error

* chore: fix by pr comment

* chore: use snafu context replace ok_or_else

* refactor: update snafu usage

---------

Co-authored-by: Ning Sun <sunng@protonmail.com>
This commit is contained in:
localhost
2024-09-26 03:32:34 +08:00
committed by GitHub
parent 0274e752ae
commit 627a326273
26 changed files with 1275 additions and 541 deletions

View File

@@ -13,13 +13,13 @@
// limitations under the License.
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Result};
use serde_json::{Deserializer, Value};
fn processor_mut(
pipeline: &Pipeline<GreptimeTransformer>,
input_values: Vec<Value>,
) -> Result<Vec<greptime_proto::v1::Row>, String> {
) -> Result<Vec<greptime_proto::v1::Row>> {
let mut payload = pipeline.init_intermediate_state();
let mut result = Vec::with_capacity(input_values.len());
@@ -30,7 +30,7 @@ fn processor_mut(
pipeline.reset_intermediate_state(&mut payload);
}
Ok::<Vec<greptime_proto::v1::Row>, String>(result)
Ok(result)
}
fn prepare_pipeline() -> Pipeline<GreptimeTransformer> {
@@ -230,7 +230,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let input_value_str = include_str!("./data.log");
let input_value = Deserializer::from_str(input_value_str)
.into_iter::<serde_json::Value>()
.collect::<Result<Vec<_>, _>>()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
let pipeline = prepare_pipeline();
let mut group = c.benchmark_group("pipeline");

View File

@@ -14,6 +14,7 @@
#![allow(dead_code)]
pub mod error;
pub mod field;
pub mod processor;
pub mod transform;
@@ -21,12 +22,16 @@ pub mod value;
use ahash::HashSet;
use common_telemetry::debug;
use error::{IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu};
use itertools::Itertools;
use processor::{Processor, ProcessorBuilder, Processors};
use snafu::{OptionExt, ResultExt};
use transform::{TransformBuilders, Transformer, Transforms};
use value::Value;
use yaml_rust::YamlLoader;
use crate::etl::error::Result;
const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
@@ -37,13 +42,13 @@ pub enum Content {
Yaml(String),
}
pub fn parse<T>(input: &Content) -> Result<Pipeline<T>, String>
pub fn parse<T>(input: &Content) -> Result<Pipeline<T>>
where
T: Transformer,
{
match input {
Content::Yaml(str) => {
let docs = YamlLoader::load_from_str(str).map_err(|e| e.to_string())?;
let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
let doc = &docs[0];
@@ -124,7 +129,7 @@ where
.processor_builders
.into_iter()
.map(|builder| builder.build(&final_intermediate_keys))
.collect::<Result<Vec<_>, _>>()?;
.collect::<Result<Vec<_>>>()?;
let processors = Processors {
processors: processors_kind_list,
required_keys: processors_required_keys.clone(),
@@ -136,7 +141,7 @@ where
.builders
.into_iter()
.map(|builder| builder.build(&final_intermediate_keys, &output_keys))
.collect::<Result<Vec<_>, String>>()?;
.collect::<Result<Vec<_>>>()?;
let transformers = Transforms {
transforms: transfor_list,
@@ -197,7 +202,7 @@ impl<T> Pipeline<T>
where
T: Transformer,
{
pub fn exec_mut(&self, val: &mut Vec<Value>) -> Result<T::VecOutput, String> {
pub fn exec_mut(&self, val: &mut Vec<Value>) -> Result<T::VecOutput> {
for processor in self.processors.iter() {
processor.exec_mut(val)?;
}
@@ -205,7 +210,7 @@ where
self.transformer.transform_mut(val)
}
pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<(), String> {
pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<()> {
match val {
serde_json::Value::Object(map) => {
let mut search_from = 0;
@@ -230,7 +235,7 @@ where
result[0] = val.try_into()?;
}
_ => {
return Err("expect object".to_string());
return PrepareValueMustBeObjectSnafu.fail();
}
}
Ok(())
@@ -274,18 +279,11 @@ where
}
}
pub(crate) fn find_key_index(
intermediate_keys: &[String],
key: &str,
kind: &str,
) -> Result<usize, String> {
pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
intermediate_keys
.iter()
.position(|k| k == key)
.ok_or(format!(
"{} processor.{} not found in intermediate keys",
kind, key
))
.context(IntermediateKeyIndexSnafu { kind, key })
}
#[cfg(test)]

View File

@@ -0,0 +1,552 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Empty input field"))]
EmptyInputField {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Missing input field"))]
MissingInputField {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Processor must be a map"))]
ProcessorMustBeMap {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{processor} processor: missing field: {field}"))]
ProcessorMissingField {
processor: String,
field: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{processor} processor: expect string value, but got {v:?}"))]
ProcessorExpectString {
processor: String,
v: crate::etl::Value,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{processor} processor: unsupported value {val}"))]
ProcessorUnsupportedValue {
processor: &'static str,
val: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("processor key must be a string"))]
ProcessorKeyMustBeString {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{kind} processor: failed to parse {value}"))]
ProcessorFailedToParseString {
kind: String,
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("processor must have a string key"))]
ProcessorMustHaveStringKey {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("unsupported {processor} processor"))]
UnsupportedProcessor {
processor: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Field {field} must be a {ty}"))]
FieldMustBeType {
field: String,
ty: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Field parse from string failed: {field}"))]
FailedParseFieldFromString {
#[snafu(source)]
error: Box<dyn std::error::Error + Send + Sync>,
field: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to parse {key} as int: {value}"))]
FailedToParseIntKey {
key: String,
value: String,
#[snafu(source)]
error: std::num::ParseIntError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse {value} to int"))]
FailedToParseInt {
value: String,
#[snafu(source)]
error: std::num::ParseIntError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to parse {key} as float: {value}"))]
FailedToParseFloatKey {
key: String,
value: String,
#[snafu(source)]
error: std::num::ParseFloatError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{kind} processor.{key} not found in intermediate keys"))]
IntermediateKeyIndex {
kind: String,
key: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{k} missing value in {s}"))]
CmcdMissingValue {
k: String,
s: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{part} missing key in {s}"))]
CmcdMissingKey {
part: String,
s: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("key must be a string, but got {k:?}"))]
KeyMustBeString {
k: yaml_rust::Yaml,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("csv read error"))]
CsvRead {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: csv::Error,
},
#[snafu(display("expected at least one record from csv format, but got none"))]
CsvNoRecord {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("'{separator}' must be a single character, but got '{value}'"))]
CsvSeparatorName {
separator: &'static str,
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("'{quote}' must be a single character, but got '{value}'"))]
CsvQuoteName {
quote: &'static str,
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Parse date timezone error {value}"))]
DateParseTimezone {
value: String,
#[snafu(source)]
error: chrono_tz::ParseError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Parse date error {value}"))]
DateParse {
value: String,
#[snafu(source)]
error: chrono::ParseError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to get local timezone"))]
DateFailedToGetLocalTimezone {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to get timestamp"))]
DateFailedToGetTimestamp {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{processor} processor: invalid format {s}"))]
DateInvalidFormat {
s: String,
processor: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid Pattern: '{s}'. {detail}"))]
DissectInvalidPattern {
s: String,
detail: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty pattern is not allowed"))]
DissectEmptyPattern {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("'{split}' exceeds the input"))]
DissectSplitExceedsInput {
split: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("'{split}' does not match the input '{input}'"))]
DissectSplitNotMatchInput {
split: String,
input: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("consecutive names are not allowed: '{name1}' '{name2}'"))]
DissectConsecutiveNames {
name1: String,
name2: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("No matching pattern found"))]
DissectNoMatchingPattern {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("'{m}' modifier already set, but found {modifier}"))]
DissectModifierAlreadySet {
m: String,
modifier: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Append Order modifier is already set to '{n}', cannot be set to {order}"))]
DissectAppendOrderAlreadySet {
n: String,
order: u32,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Order can only be set to Append Modifier, current modifier is {m}"))]
DissectOrderOnlyAppend {
m: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Order can only be set to Append Modifier"))]
DissectOrderOnlyAppendModifier {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("End modifier already set: '{m}'"))]
DissectEndModifierAlreadySet {
m: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid resolution: {resolution}"))]
EpochInvalidResolution {
resolution: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("pattern is required"))]
GsubPatternRequired {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("replacement is required"))]
GsubReplacementRequired {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid regex pattern: {pattern}"))]
Regex {
#[snafu(source)]
error: regex::Error,
pattern: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("separator is required"))]
JoinSeparatorRequired {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid method: {method}"))]
LetterInvalidMethod {
method: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("no named group found in regex {origin}"))]
RegexNamedGroupNotFound {
origin: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("no valid field found in {processor} processor"))]
RegexNoValidField {
processor: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("no valid pattern found in {processor} processor"))]
RegexNoValidPattern {
processor: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid method: {s}"))]
UrlEncodingInvalidMethod {
s: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("url decoding error"))]
UrlEncodingDecode {
#[snafu(source)]
error: std::string::FromUtf8Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid transform on_failure value: {value}"))]
TransformOnFailureInvalidValue {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("transform element must be a map"))]
TransformElementMustBeMap {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("transform {fields:?} type MUST BE set before default {default}"))]
TransformTypeMustBeSet {
fields: String,
default: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("transform cannot be empty"))]
TransformEmpty {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("column name must be unique, but got duplicated: {duplicates}"))]
TransformColumnNameMustBeUnique {
duplicates: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Illegal to set multiple timestamp Index columns, please set only one: {columns}"
))]
TransformMultipleTimestampIndex {
columns: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("transform must have exactly one field specified as timestamp Index, but got {count}: {columns}"))]
TransformTimestampIndexCount {
count: usize,
columns: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Null type not supported"))]
CoerceUnsupportedNullType {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Null type not supported when to coerce '{ty}' type"))]
CoerceUnsupportedNullTypeTo {
ty: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{ty} value not supported for Epoch"))]
CoerceUnsupportedEpochType {
ty: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to coerce string value '{s}' to type '{ty}'"))]
CoerceStringToType {
s: String,
ty: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}"
))]
ValueInvalidResolution {
resolution: String,
valid_resolution: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to parse type: '{t}'"))]
ValueParseType {
t: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to parse {ty}: {v}"))]
ValueParseInt {
ty: String,
v: String,
#[snafu(source)]
error: std::num::ParseIntError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to parse {ty}: {v}"))]
ValueParseFloat {
ty: String,
v: String,
#[snafu(source)]
error: std::num::ParseFloatError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to parse {ty}: {v}"))]
ValueParseBoolean {
ty: String,
v: String,
#[snafu(source)]
error: std::str::ParseBoolError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("default value not unsupported for type {value}"))]
ValueDefaultValueUnsupported {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("unsupported number type: {value}"))]
ValueUnsupportedNumberType {
value: serde_json::Number,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("unsupported yaml type: {value:?}"))]
ValueUnsupportedYamlType {
value: yaml_rust::Yaml,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("key in Hash must be a string, but got {value:?}"))]
ValueYamlKeyMustBeString {
value: yaml_rust::Yaml,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Yaml load error."))]
YamlLoad {
#[snafu(source)]
error: yaml_rust::ScanError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Prepare value must be an object"))]
PrepareValueMustBeObject {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Column options error"))]
ColumnOptions {
#[snafu(source)]
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("unsupported index type: {value}"))]
UnsupportedIndexType {
value: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
StatusCode::InvalidArguments
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -15,6 +15,10 @@
use std::ops::Deref;
use std::str::FromStr;
use snafu::OptionExt;
use super::error::{EmptyInputFieldSnafu, MissingInputFieldSnafu};
use crate::etl::error::{Error, Result};
use crate::etl::find_key_index;
/// Information about the input field including the name and index in intermediate keys.
@@ -56,7 +60,7 @@ impl OneInputOneOutputField {
intermediate_keys: &[String],
input_field: &str,
target_field: &str,
) -> Result<Self, String> {
) -> Result<Self> {
let input_index = find_key_index(intermediate_keys, input_field, processor_kind)?;
let input_field_info = InputFieldInfo::new(input_field, input_index);
@@ -145,19 +149,19 @@ pub struct Field {
}
impl FromStr for Field {
type Err = String;
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
fn from_str(s: &str) -> Result<Self> {
let mut parts = s.split(',');
let input_field = parts
.next()
.ok_or("input field is missing")?
.context(MissingInputFieldSnafu)?
.trim()
.to_string();
let target_field = parts.next().map(|x| x.trim().to_string());
if input_field.is_empty() {
return Err("input field is empty".to_string());
return EmptyInputFieldSnafu.fail();
}
Ok(Field {

View File

@@ -36,10 +36,16 @@ use itertools::Itertools;
use join::{JoinProcessor, JoinProcessorBuilder};
use letter::{LetterProcessor, LetterProcessorBuilder};
use regex::{RegexProcessor, RegexProcessorBuilder};
use snafu::{OptionExt, ResultExt};
use timestamp::{TimestampProcessor, TimestampProcessorBuilder};
use urlencoding::{UrlEncodingProcessor, UrlEncodingProcessorBuilder};
use super::error::{
FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu,
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, UnsupportedProcessorSnafu,
};
use super::field::{Field, Fields};
use crate::etl::error::{Error, Result};
use crate::etl::value::Value;
const FIELD_NAME: &str = "field";
@@ -70,7 +76,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
fn ignore_missing(&self) -> bool;
/// Execute the processor on a vector which be preprocessed by the pipeline
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String>;
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()>;
}
#[derive(Debug)]
@@ -98,7 +104,7 @@ pub trait ProcessorBuilder: std::fmt::Debug + Send + Sync + 'static {
/// Get the processor's input keys
fn input_keys(&self) -> HashSet<&str>;
/// Build the processor
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String>;
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind>;
}
#[derive(Debug)]
@@ -171,9 +177,9 @@ impl Processors {
}
impl TryFrom<&Vec<yaml_rust::Yaml>> for ProcessorBuilderList {
type Error = String;
type Error = Error;
fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self, Self::Error> {
fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self> {
let mut processors_builders = vec![];
let mut all_output_keys = HashSet::with_capacity(50);
let mut all_required_keys = HashSet::with_capacity(50);
@@ -226,13 +232,10 @@ impl TryFrom<&Vec<yaml_rust::Yaml>> for ProcessorBuilderList {
}
}
fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders, String> {
let map = doc.as_hash().ok_or("processor must be a map".to_string())?;
fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders> {
let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?;
let key = map
.keys()
.next()
.ok_or("processor must have a string key".to_string())?;
let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?;
let value = map
.get(key)
@@ -240,9 +243,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders, String> {
.as_hash()
.expect("processor value must be a map");
let str_key = key
.as_str()
.ok_or("processor key must be a string".to_string())?;
let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;
let processor = match str_key {
cmcd::PROCESSOR_CMCD => ProcessorBuilders::Cmcd(CmcdProcessorBuilder::try_from(value)?),
@@ -264,58 +265,72 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders, String> {
urlencoding::PROCESSOR_URL_ENCODING => {
ProcessorBuilders::UrlEncoding(UrlEncodingProcessorBuilder::try_from(value)?)
}
_ => return Err(format!("unsupported {} processor", str_key)),
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};
Ok(processor)
}
pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String, String> {
pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
v.as_str()
.map(|s| s.to_string())
.ok_or(format!("'{field}' must be a string"))
.context(FieldMustBeTypeSnafu {
field,
ty: "string",
})
}
pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>, String> {
pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
let vec = v
.as_vec()
.ok_or(format!("'{field}' must be a list of strings",))?
.context(FieldMustBeTypeSnafu {
field,
ty: "list of string",
})?
.iter()
.map(|v| v.as_str().unwrap_or_default().into())
.collect();
Ok(vec)
}
pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool, String> {
v.as_bool().ok_or(format!("'{field}' must be a boolean"))
pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
v.as_bool().context(FieldMustBeTypeSnafu {
field,
ty: "boolean",
})
}
pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T, String>
pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T>
where
T: std::str::FromStr,
T::Err: ToString,
T::Err: std::error::Error + Send + Sync + 'static,
{
yaml_string(v, field)?
.parse::<T>()
.map_err(|e| e.to_string())
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
.context(FailedParseFieldFromStringSnafu { field })
}
pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>, String>
pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>>
where
T: std::str::FromStr,
T::Err: ToString,
T::Err: std::error::Error + Send + Sync + 'static,
{
yaml_strings(v, field).and_then(|v| {
v.into_iter()
.map(|s| s.parse::<T>().map_err(|e| e.to_string()))
.map(|s| {
s.parse::<T>()
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
.context(FailedParseFieldFromStringSnafu { field })
})
.collect()
})
}
pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields, String> {
pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
yaml_parse_strings(v, field).map(Fields::new)
}
pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field, String> {
pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {
yaml_parse_string(v, field)
}

View File

@@ -15,8 +15,14 @@
use std::collections::BTreeMap;
use ahash::HashSet;
use snafu::{OptionExt, ResultExt};
use urlencoding::decode;
use crate::etl::error::{
CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Field, Fields, InputFieldInfo, OneInputMultiOutputField};
use crate::etl::find_key_index;
use crate::etl::processor::{
@@ -82,7 +88,7 @@ impl CmcdProcessorBuilder {
pub(super) fn build_cmcd_outputs(
field: &Field,
intermediate_keys: &[String],
) -> Result<(BTreeMap<String, usize>, Vec<CmcdOutputInfo>), String> {
) -> Result<(BTreeMap<String, usize>, Vec<CmcdOutputInfo>)> {
let mut output_index = BTreeMap::new();
let mut cmcd_field_outputs = Vec::with_capacity(CMCD_KEYS.len());
for cmcd in CMCD_KEYS {
@@ -119,7 +125,7 @@ impl CmcdProcessorBuilder {
}
/// build CmcdProcessor from CmcdProcessorBuilder
pub fn build(self, intermediate_keys: &[String]) -> Result<CmcdProcessor, String> {
pub fn build(self, intermediate_keys: &[String]) -> Result<CmcdProcessor> {
let mut real_fields = vec![];
let mut cmcd_outputs = Vec::with_capacity(CMCD_KEYS.len());
for field in self.fields.into_iter() {
@@ -151,7 +157,7 @@ impl ProcessorBuilder for CmcdProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Cmcd)
}
}
@@ -170,7 +176,7 @@ pub(super) struct CmcdOutputInfo {
/// index in intermediate_keys
index: usize,
/// function to resolve value
f: fn(&str, &str, Option<&str>) -> Result<Value, String>,
f: fn(&str, &str, Option<&str>) -> Result<Value>,
}
impl CmcdOutputInfo {
@@ -178,7 +184,7 @@ impl CmcdOutputInfo {
final_key: String,
key: &'static str,
index: usize,
f: fn(&str, &str, Option<&str>) -> Result<Value, String>,
f: fn(&str, &str, Option<&str>) -> Result<Value>,
) -> Self {
Self {
final_key,
@@ -201,28 +207,28 @@ impl Default for CmcdOutputInfo {
}
/// function to resolve CMCD_KEY_BS | CMCD_KEY_SU
fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result<Value, String> {
fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result<Value> {
Ok(Value::Boolean(true))
}
/// function to resolve CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP | CMCD_KEY_RTP | CMCD_KEY_TB
fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result<Value, String> {
let v = v.ok_or(format!("{k} missing value in {s}"))?;
fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
let v = v.context(CmcdMissingValueSnafu { k, s })?;
let val: i64 = v
.parse()
.map_err(|_| format!("failed to parse {v} as i64"))?;
.context(FailedToParseIntKeySnafu { key: k, value: v })?;
Ok(Value::Int64(val))
}
/// function to resolve CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID | CMCD_KEY_V
fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result<Value, String> {
let v = v.ok_or(format!("{k} missing value in {s}"))?;
fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
let v = v.context(CmcdMissingValueSnafu { k, s })?;
Ok(Value::String(v.to_string()))
}
/// function to resolve CMCD_KEY_NOR
fn nor(s: &str, k: &str, v: Option<&str>) -> Result<Value, String> {
let v = v.ok_or(format!("{k} missing value in {s}"))?;
fn nor(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
let v = v.context(CmcdMissingValueSnafu { k, s })?;
let val = match decode(v) {
Ok(val) => val.to_string(),
Err(_) => v.to_string(),
@@ -231,11 +237,11 @@ fn nor(s: &str, k: &str, v: Option<&str>) -> Result<Value, String> {
}
/// function to resolve CMCD_KEY_PR
fn pr(s: &str, k: &str, v: Option<&str>) -> Result<Value, String> {
let v = v.ok_or(format!("{k} missing value in {s}"))?;
fn pr(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
let v = v.context(CmcdMissingValueSnafu { k, s })?;
let val: f64 = v
.parse()
.map_err(|_| format!("failed to parse {v} as f64"))?;
.context(FailedToParseFloatKeySnafu { key: k, value: v })?;
Ok(Value::Float64(val))
}
@@ -287,12 +293,12 @@ impl CmcdProcessor {
format!("{}_{}", prefix, key)
}
fn parse(&self, field_index: usize, s: &str) -> Result<Vec<(usize, Value)>, String> {
fn parse(&self, field_index: usize, s: &str) -> Result<Vec<(usize, Value)>> {
let parts = s.split(',');
let mut result = Vec::new();
for part in parts {
let mut kv = part.split('=');
let k = kv.next().ok_or(format!("{part} missing key in {s}"))?;
let k = kv.next().context(CmcdMissingKeySnafu { part, s })?;
let v = kv.next();
for cmcd_key in self.cmcd_outputs[field_index].iter() {
@@ -308,16 +314,16 @@ impl CmcdProcessor {
}
impl TryFrom<&yaml_rust::yaml::Hash> for CmcdProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut ignore_missing = false;
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
@@ -362,7 +368,7 @@ impl Processor for CmcdProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for (field_index, field) in self.fields.iter().enumerate() {
let field_value_index = field.input_index();
match val.get(field_value_index) {
@@ -374,18 +380,19 @@ impl Processor for CmcdProcessor {
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.input_name()
));
return ProcessorMissingFieldSnafu {
processor: self.kind().to_string(),
field: field.input_name().to_string(),
}
.fail();
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
return ProcessorExpectStringSnafu {
processor: self.kind().to_string(),
v: v.clone(),
}
.fail();
}
}
}

View File

@@ -18,7 +18,12 @@ use ahash::HashSet;
use csv::{ReaderBuilder, Trim};
use itertools::EitherOrBoth::{Both, Left, Right};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField};
use crate::etl::find_key_index;
use crate::etl::processor::{
@@ -53,7 +58,7 @@ pub struct CsvProcessorBuilder {
}
impl CsvProcessorBuilder {
fn build(self, intermediate_keys: &[String]) -> Result<CsvProcessor, String> {
fn build(self, intermediate_keys: &[String]) -> Result<CsvProcessor> {
let mut real_fields = vec![];
for field in self.fields {
@@ -68,7 +73,7 @@ impl CsvProcessorBuilder {
.target_fields
.iter()
.map(|f| find_key_index(intermediate_keys, f, "csv"))
.collect::<Result<Vec<_>, String>>()?;
.collect::<Result<Vec<_>>>()?;
Ok(CsvProcessor {
reader: self.reader,
fields: real_fields,
@@ -88,7 +93,7 @@ impl ProcessorBuilder for CsvProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Csv)
}
}
@@ -114,11 +119,11 @@ pub struct CsvProcessor {
impl CsvProcessor {
// process the csv format string to a map with target_fields as keys
fn process(&self, val: &str) -> Result<Vec<(usize, Value)>, String> {
fn process(&self, val: &str) -> Result<Vec<(usize, Value)>> {
let mut reader = self.reader.from_reader(val.as_bytes());
if let Some(result) = reader.records().next() {
let record: csv::StringRecord = result.map_err(|e| e.to_string())?;
let record: csv::StringRecord = result.context(CsvReadSnafu)?;
let values: Vec<(usize, Value)> = self
.output_index_info
@@ -142,15 +147,15 @@ impl CsvProcessor {
Ok(values)
} else {
Err("expected at least one record from csv format, but got none".into())
CsvNoRecordSnafu.fail()
}
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut reader = ReaderBuilder::new();
reader.has_headers(false);
@@ -162,7 +167,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessorBuilder {
for (k, v) in hash {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
@@ -180,10 +185,11 @@ impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessorBuilder {
SEPARATOR_NAME => {
let separator = yaml_string(v, SEPARATOR_NAME)?;
if separator.len() != 1 {
return Err(format!(
"'{}' must be a single character, but got '{}'",
SEPARATOR_NAME, separator
));
return CsvSeparatorNameSnafu {
separator: SEPARATOR_NAME,
value: separator,
}
.fail();
} else {
reader.delimiter(separator.as_bytes()[0]);
}
@@ -191,10 +197,11 @@ impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessorBuilder {
QUOTE_NAME => {
let quote = yaml_string(v, QUOTE_NAME)?;
if quote.len() != 1 {
return Err(format!(
"'{}' must be a single character, but got '{}'",
QUOTE_NAME, quote
));
return CsvQuoteNameSnafu {
quote: QUOTE_NAME,
value: quote,
}
.fail();
} else {
reader.quote(quote.as_bytes()[0]);
}
@@ -240,7 +247,7 @@ impl Processor for CsvProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
@@ -252,18 +259,19 @@ impl Processor for CsvProcessor {
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.input_name()
));
return ProcessorMissingFieldSnafu {
processor: self.kind().to_string(),
field: field.input_name().to_string(),
}
.fail();
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
return ProcessorExpectStringSnafu {
processor: self.kind().to_string(),
v: v.clone(),
}
.fail();
}
}
}

View File

@@ -18,7 +18,13 @@ use ahash::HashSet;
use chrono::{DateTime, NaiveDateTime};
use chrono_tz::Tz;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor,
@@ -103,13 +109,13 @@ impl ProcessorBuilder for DateProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Date)
}
}
impl DateProcessorBuilder {
pub fn build(self, intermediate_keys: &[String]) -> Result<DateProcessor, String> {
pub fn build(self, intermediate_keys: &[String]) -> Result<DateProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
@@ -131,9 +137,9 @@ impl DateProcessorBuilder {
}
impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut formats = Formats::default();
let mut timezone = None;
@@ -143,7 +149,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessorBuilder {
for (k, v) in hash {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
@@ -205,10 +211,12 @@ pub struct DateProcessor {
}
impl DateProcessor {
fn parse(&self, val: &str) -> Result<Timestamp, String> {
fn parse(&self, val: &str) -> Result<Timestamp> {
let mut tz = Tz::UTC;
if let Some(timezone) = &self.timezone {
tz = timezone.parse::<Tz>().map_err(|e| e.to_string())?;
tz = timezone.parse::<Tz>().context(DateParseTimezoneSnafu {
value: timezone.as_ref(),
})?;
}
for fmt in self.formats.iter() {
@@ -217,7 +225,11 @@ impl DateProcessor {
}
}
Err(format!("{} processor: failed to parse {val}", self.kind(),))
ProcessorFailedToParseStringSnafu {
kind: PROCESSOR_DATE.to_string(),
value: val.to_string(),
}
.fail()
}
}
@@ -230,7 +242,7 @@ impl Processor for DateProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
@@ -241,18 +253,19 @@ impl Processor for DateProcessor {
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.input_name()
));
return ProcessorMissingFieldSnafu {
processor: self.kind().to_string(),
field: field.input_name().to_string(),
}
.fail();
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
return ProcessorExpectStringSnafu {
processor: self.kind().to_string(),
v: v.clone(),
}
.fail();
}
}
}
@@ -261,16 +274,20 @@ impl Processor for DateProcessor {
}
/// try to parse val with timezone first, if failed, parse without timezone
fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64, String> {
fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?)
Ok(dt
.timestamp_nanos_opt()
.context(DateFailedToGetTimestampSnafu)?)
} else {
let dt = NaiveDateTime::parse_from_str(val, fmt)
.map_err(|e| e.to_string())?
.context(DateParseSnafu { value: val })?
.and_local_timezone(tz)
.single()
.ok_or("failed to get local timezone")?;
Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?)
.context(DateFailedToGetLocalTimezoneSnafu)?;
Ok(dt
.timestamp_nanos_opt()
.context(DateFailedToGetTimestampSnafu)?)
}
}

View File

@@ -16,7 +16,15 @@ use std::ops::Deref;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use itertools::Itertools;
use snafu::OptionExt;
use crate::etl::error::{
DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu,
DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu,
DissectNoMatchingPatternSnafu, DissectOrderOnlyAppendModifierSnafu,
DissectOrderOnlyAppendSnafu, DissectSplitExceedsInputSnafu, DissectSplitNotMatchInputSnafu,
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField};
use crate::etl::find_key_index;
use crate::etl::processor::{
@@ -77,9 +85,13 @@ impl NameInfo {
self.name.is_empty() && self.start_modifier.is_none() && self.end_modifier.is_none()
}
fn try_start_modifier(&mut self, modifier: StartModifier) -> Result<(), String> {
fn try_start_modifier(&mut self, modifier: StartModifier) -> Result<()> {
match &self.start_modifier {
Some(m) => Err(format!("'{m}' modifier already set, but found {modifier}",)),
Some(m) => DissectModifierAlreadySetSnafu {
m: m.to_string(),
modifier: modifier.to_string(),
}
.fail(),
None => {
self.start_modifier = Some(modifier);
Ok(())
@@ -87,27 +99,27 @@ impl NameInfo {
}
}
fn try_append_order(&mut self, order: u32) -> Result<(), String> {
fn try_append_order(&mut self, order: u32) -> Result<()> {
match &mut self.start_modifier {
Some(StartModifier::Append(o)) => match o {
Some(n) => Err(format!(
"Append Order modifier is already set to '{n}', cannot be set to {order}"
)),
Some(n) => DissectAppendOrderAlreadySetSnafu {
n: n.to_string(),
order,
}
.fail(),
None => {
*o = Some(order);
Ok(())
}
},
Some(m) => Err(format!(
"Order can only be set to Append Modifier, current modifier is {m}"
)),
None => Err("Order can only be set to Append Modifier".to_string()),
Some(m) => DissectOrderOnlyAppendSnafu { m: m.to_string() }.fail(),
None => DissectOrderOnlyAppendModifierSnafu.fail(),
}
}
fn try_end_modifier(&mut self) -> Result<(), String> {
fn try_end_modifier(&mut self) -> Result<()> {
match &self.end_modifier {
Some(m) => Err(format!("End modifier already set: '{m}'")),
Some(m) => DissectEndModifierAlreadySetSnafu { m: m.to_string() }.fail(),
None => {
self.end_modifier = Some(EndModifier);
Ok(())
@@ -290,9 +302,9 @@ impl std::ops::DerefMut for PatternInfo {
}
impl std::str::FromStr for PatternInfo {
type Err = String;
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
fn from_str(s: &str) -> Result<Self> {
let mut parts = vec![];
let mut cursor = PartInfo::empty_split();
@@ -338,9 +350,11 @@ impl std::str::FromStr for PatternInfo {
}
if j == pos + 1 {
return Err(format!(
"Invalid Pattern: '{s}'. Digit order must be set after '/'",
));
return DissectInvalidPatternSnafu {
s,
detail: "Digit order must be set after '/'",
}
.fail();
}
name.try_append_order(order)?;
@@ -358,14 +372,20 @@ impl std::str::FromStr for PatternInfo {
('-', PartInfo::Name(name)) if !name.is_end_modifier_set() => {
if let Some('>') = chars.get(pos + 1) {
} else {
return Err(format!(
"Invalid Pattern: '{s}'. expected '->' but only '-'",
));
return DissectInvalidPatternSnafu {
s,
detail: "Expected '->' but only '-'",
}
.fail();
}
if let Some('}') = chars.get(pos + 2) {
} else {
return Err(format!("Invalid Pattern: '{s}'. expected '}}' after '->'",));
return DissectInvalidPatternSnafu {
s,
detail: "Expected '}' after '->'",
}
.fail();
}
name.try_end_modifier()?;
@@ -377,7 +397,7 @@ impl std::str::FromStr for PatternInfo {
} else {
format!("Invalid '{ch}' in '{name}'")
};
return Err(format!("Invalid Pattern: '{s}'. {tail}"));
return DissectInvalidPatternSnafu { s, detail: tail }.fail();
}
(_, PartInfo::Name(_)) => {
cursor.push(ch);
@@ -390,7 +410,11 @@ impl std::str::FromStr for PatternInfo {
match cursor {
PartInfo::Split(ref split) if !split.is_empty() => parts.push(cursor),
PartInfo::Name(name) if !name.is_empty() => {
return Err(format!("Invalid Pattern: '{s}'. '{name}' is not closed"))
return DissectInvalidPatternSnafu {
s,
detail: format!("'{name}' is not closed"),
}
.fail();
}
_ => {}
}
@@ -402,9 +426,9 @@ impl std::str::FromStr for PatternInfo {
}
impl PatternInfo {
fn check(&self) -> Result<(), String> {
fn check(&self) -> Result<()> {
if self.len() == 0 {
return Err("Empty pattern is not allowed".to_string());
return DissectEmptyPatternSnafu.fail();
}
let mut map_keys = HashSet::new();
@@ -415,42 +439,47 @@ impl PatternInfo {
let next_part = self.get(i + 1);
match (this_part, next_part) {
(PartInfo::Split(split), _) if split.is_empty() => {
return Err(format!(
"Invalid Pattern: '{}'. Empty split is not allowed",
self.origin
));
return DissectInvalidPatternSnafu {
s: &self.origin,
detail: "Empty split is not allowed",
}
.fail();
}
(PartInfo::Name(name1), Some(PartInfo::Name(name2))) => {
return Err(format!(
"Invalid Pattern: '{}'. consecutive names are not allowed: '{}' '{}'",
self.origin, name1, name2
));
return DissectInvalidPatternSnafu {
s: &self.origin,
detail: format!("consecutive names are not allowed: '{name1}' '{name2}'",),
}
.fail();
}
(PartInfo::Name(name), _) if name.is_name_empty() => {
if let Some(ref m) = name.start_modifier {
return Err(format!(
"Invalid Pattern: '{}'. only '{}' modifier is invalid",
self.origin, m
));
return DissectInvalidPatternSnafu {
s: &self.origin,
detail: format!("only '{m}' modifier is invalid"),
}
.fail();
}
}
(PartInfo::Name(name), _) => match name.start_modifier {
Some(StartModifier::MapKey) => {
if map_keys.contains(&name.name) {
return Err(format!(
"Invalid Pattern: '{}'. Duplicate map key: '{}'",
self.origin, name.name
));
return DissectInvalidPatternSnafu {
s: &self.origin,
detail: format!("Duplicate map key: '{}'", name.name),
}
.fail();
} else {
map_keys.insert(&name.name);
}
}
Some(StartModifier::MapVal) => {
if map_vals.contains(&name.name) {
return Err(format!(
"Invalid Pattern: '{}'. Duplicate map val: '{}'",
self.origin, name.name
));
return DissectInvalidPatternSnafu {
s: &self.origin,
detail: format!("Duplicate map val: '{}'", name.name),
}
.fail();
} else {
map_vals.insert(&name.name);
}
@@ -462,15 +491,18 @@ impl PatternInfo {
}
if map_keys != map_vals {
return Err(format!(
"Invalid Pattern: '{}'. key and value not matched: '{}'",
self.origin,
map_keys
.symmetric_difference(&map_vals)
.map(|s| s.as_str())
.collect::<Vec<&str>>()
.join(",")
));
return DissectInvalidPatternSnafu {
s: &self.origin,
detail: format!(
"key and value not matched: '{}'",
map_keys
.symmetric_difference(&map_vals)
.map(|s| s.as_str())
.collect::<Vec<&str>>()
.join(",")
),
}
.fail();
}
Ok(())
@@ -516,10 +548,7 @@ impl DissectProcessorBuilder {
.collect()
}
fn part_info_to_part(
part_info: PartInfo,
intermediate_keys: &[String],
) -> Result<Part, String> {
fn part_info_to_part(part_info: PartInfo, intermediate_keys: &[String]) -> Result<Part> {
match part_info {
PartInfo::Split(s) => Ok(Part::Split(s)),
PartInfo::Name(n) => match n.start_modifier {
@@ -545,13 +574,13 @@ impl DissectProcessorBuilder {
fn pattern_info_to_pattern(
pattern_info: PatternInfo,
intermediate_keys: &[String],
) -> Result<Pattern, String> {
) -> Result<Pattern> {
let original = pattern_info.origin;
let pattern = pattern_info
.parts
.into_iter()
.map(|part_info| Self::part_info_to_part(part_info, intermediate_keys))
.collect::<Result<Vec<_>, String>>()?;
.collect::<Result<Vec<_>>>()?;
Ok(Pattern {
origin: original,
parts: pattern,
@@ -561,7 +590,7 @@ impl DissectProcessorBuilder {
fn build_patterns_from_pattern_infos(
patterns: Vec<PatternInfo>,
intermediate_keys: &[String],
) -> Result<Vec<Pattern>, String> {
) -> Result<Vec<Pattern>> {
patterns
.into_iter()
.map(|pattern_info| Self::pattern_info_to_pattern(pattern_info, intermediate_keys))
@@ -578,7 +607,7 @@ impl ProcessorBuilder for DissectProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input_index = find_key_index(intermediate_keys, field.input_field(), "dissect")?;
@@ -610,11 +639,7 @@ pub struct DissectProcessor {
}
impl DissectProcessor {
fn process_pattern(
&self,
chs: &[char],
pattern: &Pattern,
) -> Result<Vec<(usize, Value)>, String> {
fn process_pattern(&self, chs: &[char], pattern: &Pattern) -> Result<Vec<(usize, Value)>> {
let mut map = Vec::new();
let mut pos = 0;
@@ -668,23 +693,26 @@ impl DissectProcessor {
let split_chs = split.chars().collect::<Vec<char>>();
let split_len = split_chs.len();
if pos + split_len > chs.len() {
return Err(format!("'{split}' exceeds the input",));
return DissectSplitExceedsInputSnafu { split }.fail();
}
if &chs[pos..pos + split_len] != split_chs.as_slice() {
return Err(format!(
"'{split}' does not match the input '{}'",
chs[pos..pos + split_len].iter().collect::<String>()
));
return DissectSplitNotMatchInputSnafu {
split,
input: chs[pos..pos + split_len].iter().collect::<String>(),
}
.fail();
}
pos += split_len;
}
(Part::Name(name1), Some(Part::Name(name2))) => {
return Err(format!(
"consecutive names are not allowed: '{name1}' '{name2}'"
));
return DissectConsecutiveNamesSnafu {
name1: name1.to_string(),
name2: name2.to_string(),
}
.fail();
}
// if Name part is the last part, then the rest of the input is the value
@@ -695,10 +723,10 @@ impl DissectProcessor {
// if Name part, and next part is Split, then find the matched value of the name
(Part::Name(name), Some(Part::Split(split))) => {
let stop = split
.chars()
.next()
.ok_or("Empty split is not allowed".to_string())?; // this won't happen
let stop = split.chars().next().context(DissectInvalidPatternSnafu {
s: &pattern.origin,
detail: "Empty split is not allowed",
})?; // this won't happen
let mut end = pos;
while end < chs.len() && chs[end] != stop {
end += 1;
@@ -737,7 +765,7 @@ impl DissectProcessor {
Ok(map)
}
fn process(&self, val: &str) -> Result<Vec<(usize, Value)>, String> {
fn process(&self, val: &str) -> Result<Vec<(usize, Value)>> {
let chs = val.chars().collect::<Vec<char>>();
for pattern in &self.patterns {
@@ -745,15 +773,14 @@ impl DissectProcessor {
return Ok(map);
}
}
Err("No matching pattern found".to_string())
DissectNoMatchingPatternSnafu.fail()
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut patterns = vec![];
let mut ignore_missing = false;
@@ -762,7 +789,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessorBuilder {
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got '{k:?}'"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
@@ -809,7 +836,7 @@ impl Processor for DissectProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
@@ -821,18 +848,19 @@ impl Processor for DissectProcessor {
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.input_name()
));
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
return ProcessorExpectStringSnafu {
processor: self.kind(),
v: v.clone(),
}
.fail();
}
}
}
@@ -1123,7 +1151,7 @@ mod tests {
),
(
"%{->clientip} ",
"Invalid Pattern: '%{->clientip} '. expected '}' after '->'",
"Invalid Pattern: '%{->clientip} '. Expected '}' after '->'",
),
(
"%{/clientip} ",
@@ -1185,7 +1213,7 @@ mod tests {
for (pattern, expected) in cases.into_iter() {
let err = pattern.parse::<PatternInfo>().unwrap_err();
assert_eq!(err, expected);
assert_eq!(err.to_string(), expected);
}
}

View File

@@ -13,7 +13,12 @@
// limitations under the License.
use ahash::HashSet;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder,
@@ -39,15 +44,15 @@ enum Resolution {
}
impl TryFrom<&str> for Resolution {
type Error = String;
type Error = Error;
fn try_from(s: &str) -> Result<Self, Self::Error> {
fn try_from(s: &str) -> Result<Self> {
match s {
SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
_ => Err(format!("invalid resolution: {s}")),
_ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
}
}
}
@@ -71,13 +76,13 @@ impl ProcessorBuilder for EpochProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Epoch)
}
}
impl EpochProcessorBuilder {
pub fn build(self, intermediate_keys: &[String]) -> Result<EpochProcessor, String> {
pub fn build(self, intermediate_keys: &[String]) -> Result<EpochProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
@@ -112,11 +117,11 @@ pub struct EpochProcessor {
}
impl EpochProcessor {
fn parse(&self, val: &Value) -> Result<Timestamp, String> {
fn parse(&self, val: &Value) -> Result<Timestamp> {
let t: i64 = match val {
Value::String(s) => s
.parse::<i64>()
.map_err(|e| format!("Failed to parse {} to number: {}", s, e))?,
.context(FailedToParseIntSnafu { value: s })?,
Value::Int16(i) => *i as i64,
Value::Int32(i) => *i as i64,
Value::Int64(i) => *i,
@@ -135,9 +140,11 @@ impl EpochProcessor {
},
_ => {
return Err(format!(
"{PROCESSOR_EPOCH} processor: unsupported value {val}"
))
return ProcessorUnsupportedValueSnafu {
processor: PROCESSOR_EPOCH,
val: val.to_string(),
}
.fail();
}
};
@@ -151,9 +158,9 @@ impl EpochProcessor {
}
impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut resolution = Resolution::default();
let mut ignore_missing = false;
@@ -161,7 +168,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessorBuilder {
for (k, v) in hash {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
@@ -200,17 +207,17 @@ impl Processor for EpochProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.input_name()
));
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {

View File

@@ -14,11 +14,16 @@
use ahash::HashSet;
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder,
ProcessorKind, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, ProcessorBuilder, ProcessorKind,
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
};
use crate::etl::value::Value;
@@ -46,25 +51,25 @@ impl ProcessorBuilder for GsubProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Gsub)
}
}
impl GsubProcessorBuilder {
fn check(self) -> Result<Self, String> {
fn check(self) -> Result<Self> {
if self.pattern.is_none() {
return Err("pattern is required".to_string());
return GsubPatternRequiredSnafu.fail();
}
if self.replacement.is_none() {
return Err("replacement is required".to_string());
return GsubReplacementRequiredSnafu.fail();
}
Ok(self)
}
fn build(self, intermediate_keys: &[String]) -> Result<GsubProcessor, String> {
fn build(self, intermediate_keys: &[String]) -> Result<GsubProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
@@ -94,19 +99,19 @@ pub struct GsubProcessor {
}
impl GsubProcessor {
fn check(self) -> Result<Self, String> {
fn check(self) -> Result<Self> {
if self.pattern.is_none() {
return Err("pattern is required".to_string());
return GsubPatternRequiredSnafu.fail();
}
if self.replacement.is_none() {
return Err("replacement is required".to_string());
return GsubReplacementRequiredSnafu.fail();
}
Ok(self)
}
fn process_string(&self, val: &str) -> Result<Value, String> {
fn process_string(&self, val: &str) -> Result<Value> {
let replacement = self.replacement.as_ref().unwrap();
let new_val = self
.pattern
@@ -119,21 +124,26 @@ impl GsubProcessor {
Ok(val)
}
fn process(&self, val: &Value) -> Result<Value, String> {
fn process(&self, val: &Value) -> Result<Value> {
match val {
Value::String(val) => self.process_string(val),
_ => Err(format!(
"{} processor: expect string or array string, but got {val:?}",
self.kind()
)),
_ => ProcessorExpectStringSnafu {
processor: PROCESSOR_GSUB,
v: val.clone(),
}
.fail(),
// Err(format!(
// "{} processor: expect string or array string, but got {val:?}",
// self.kind()
// )),
}
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut ignore_missing = false;
let mut pattern = None;
@@ -142,7 +152,8 @@ impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessorBuilder {
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
@@ -152,7 +163,9 @@ impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessorBuilder {
}
PATTERN_NAME => {
let pattern_str = yaml_string(v, PATTERN_NAME)?;
pattern = Some(Regex::new(&pattern_str).map_err(|e| e.to_string())?);
pattern = Some(Regex::new(&pattern_str).context(RegexSnafu {
pattern: pattern_str,
})?);
}
REPLACEMENT_NAME => {
let replacement_str = yaml_string(v, REPLACEMENT_NAME)?;
@@ -187,17 +200,17 @@ impl crate::etl::processor::Processor for GsubProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.input_name()
));
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {

View File

@@ -13,7 +13,12 @@
// limitations under the License.
use ahash::HashSet;
use snafu::OptionExt;
use crate::etl::error::{
Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder,
@@ -42,21 +47,21 @@ impl ProcessorBuilder for JoinProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Join)
}
}
impl JoinProcessorBuilder {
fn check(self) -> Result<Self, String> {
fn check(self) -> Result<Self> {
if self.separator.is_none() {
return Err("separator is required".to_string());
return JoinSeparatorRequiredSnafu.fail();
}
Ok(self)
}
pub fn build(self, intermediate_keys: &[String]) -> Result<JoinProcessor, String> {
pub fn build(self, intermediate_keys: &[String]) -> Result<JoinProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
@@ -85,7 +90,7 @@ pub struct JoinProcessor {
}
impl JoinProcessor {
fn process(&self, arr: &Array) -> Result<Value, String> {
fn process(&self, arr: &Array) -> Result<Value> {
let sep = self.separator.as_ref().unwrap();
let val = arr
.iter()
@@ -96,9 +101,9 @@ impl JoinProcessor {
Ok(Value::String(val))
}
fn check(self) -> Result<Self, String> {
fn check(self) -> Result<Self> {
if self.separator.is_none() {
return Err("separator is required".to_string());
return JoinSeparatorRequiredSnafu.fail();
}
Ok(self)
@@ -106,9 +111,9 @@ impl JoinProcessor {
}
impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut separator = None;
let mut ignore_missing = false;
@@ -116,7 +121,8 @@ impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessorBuilder {
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
@@ -152,7 +158,7 @@ impl Processor for JoinProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
@@ -163,18 +169,19 @@ impl Processor for JoinProcessor {
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.input_name()
));
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
return ProcessorExpectStringSnafu {
processor: self.kind(),
v: v.clone(),
}
.fail();
}
}
}

View File

@@ -13,7 +13,12 @@
// limitations under the License.
use ahash::HashSet;
use snafu::OptionExt;
use crate::etl::error::{
Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder,
@@ -42,14 +47,14 @@ impl std::fmt::Display for Method {
}
impl std::str::FromStr for Method {
type Err = String;
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
fn from_str(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"upper" => Ok(Method::Upper),
"lower" => Ok(Method::Lower),
"capital" => Ok(Method::Capital),
_ => Err(format!("invalid method: {s}")),
_ => LetterInvalidMethodSnafu { method: s }.fail(),
}
}
}
@@ -73,13 +78,13 @@ impl ProcessorBuilder for LetterProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Letter)
}
}
impl LetterProcessorBuilder {
pub fn build(self, intermediate_keys: &[String]) -> Result<LetterProcessor, String> {
pub fn build(self, intermediate_keys: &[String]) -> Result<LetterProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
@@ -108,7 +113,7 @@ pub struct LetterProcessor {
}
impl LetterProcessor {
fn process_field(&self, val: &str) -> Result<Value, String> {
fn process_field(&self, val: &str) -> Result<Value> {
let processed = match self.method {
Method::Upper => val.to_uppercase(),
Method::Lower => val.to_lowercase(),
@@ -121,9 +126,9 @@ impl LetterProcessor {
}
impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut method = Method::Lower;
let mut ignore_missing = false;
@@ -131,7 +136,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessorBuilder {
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
@@ -166,7 +171,7 @@ impl Processor for LetterProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
@@ -177,18 +182,19 @@ impl Processor for LetterProcessor {
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
&field.input().name
));
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
return ProcessorExpectStringSnafu {
processor: self.kind(),
v: v.clone(),
}
.fail();
}
}
}

View File

@@ -21,7 +21,13 @@ pub(crate) const PROCESSOR_REGEX: &str = "regex";
use ahash::{HashSet, HashSetExt};
use lazy_static::lazy_static;
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu,
RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu,
Result,
};
use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField};
use crate::etl::find_key_index;
use crate::etl::processor::{
@@ -60,15 +66,15 @@ impl std::fmt::Display for GroupRegex {
}
impl std::str::FromStr for GroupRegex {
type Err = String;
type Err = Error;
fn from_str(origin: &str) -> Result<Self, Self::Err> {
fn from_str(origin: &str) -> Result<Self> {
let groups = get_regex_group_names(origin);
if groups.is_empty() {
return Err(format!("no named group found in regex {origin}"));
return RegexNamedGroupNotFoundSnafu { origin }.fail();
}
let regex = Regex::new(origin).map_err(|e| e.to_string())?;
let regex = Regex::new(origin).context(RegexSnafu { pattern: origin })?;
Ok(GroupRegex {
origin: origin.into(),
regex,
@@ -94,25 +100,25 @@ impl ProcessorBuilder for RegexProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Regex)
}
}
impl RegexProcessorBuilder {
fn check(self) -> Result<Self, String> {
fn check(self) -> Result<Self> {
if self.fields.is_empty() {
return Err(format!(
"no valid field found in {} processor",
PROCESSOR_REGEX
));
return RegexNoValidFieldSnafu {
processor: PROCESSOR_REGEX,
}
.fail();
}
if self.patterns.is_empty() {
return Err(format!(
"no valid pattern found in {} processor",
PROCESSOR_REGEX
));
return RegexNoValidPatternSnafu {
processor: PROCESSOR_REGEX,
}
.fail();
}
Ok(self)
@@ -122,7 +128,7 @@ impl RegexProcessorBuilder {
group_regex: &GroupRegex,
om_field: &OneInputMultiOutputField,
intermediate_keys: &[String],
) -> Result<Vec<OutPutInfo>, String> {
) -> Result<Vec<OutPutInfo>> {
group_regex
.groups
.iter()
@@ -135,35 +141,35 @@ impl RegexProcessorBuilder {
index,
})
})
.collect::<Result<Vec<_>, String>>()
.collect::<Result<Vec<_>>>()
}
fn build_group_output_infos(
patterns: &[GroupRegex],
om_field: &OneInputMultiOutputField,
intermediate_keys: &[String],
) -> Result<Vec<Vec<OutPutInfo>>, String> {
) -> Result<Vec<Vec<OutPutInfo>>> {
patterns
.iter()
.map(|group_regex| {
Self::build_group_output_info(group_regex, om_field, intermediate_keys)
})
.collect::<Result<Vec<_>, String>>()
.collect::<Result<Vec<_>>>()
}
fn build_output_info(
real_fields: &[OneInputMultiOutputField],
patterns: &[GroupRegex],
intermediate_keys: &[String],
) -> Result<RegexProcessorOutputInfo, String> {
) -> Result<RegexProcessorOutputInfo> {
let inner = real_fields
.iter()
.map(|om_field| Self::build_group_output_infos(patterns, om_field, intermediate_keys))
.collect::<Result<Vec<_>, String>>();
.collect::<Result<Vec<_>>>();
inner.map(|inner| RegexProcessorOutputInfo { inner })
}
fn build(self, intermediate_keys: &[String]) -> Result<RegexProcessor, String> {
fn build(self, intermediate_keys: &[String]) -> Result<RegexProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input_index = find_key_index(intermediate_keys, field.input_field(), "regex")?;
@@ -184,9 +190,9 @@ impl RegexProcessorBuilder {
}
impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut patterns: Vec<GroupRegex> = vec![];
let mut ignore_missing = false;
@@ -194,7 +200,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessorBuilder {
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
@@ -275,7 +281,7 @@ pub struct RegexProcessor {
}
impl RegexProcessor {
fn try_with_patterns(&mut self, patterns: Vec<String>) -> Result<(), String> {
fn try_with_patterns(&mut self, patterns: Vec<String>) -> Result<()> {
let mut rs = vec![];
for pattern in patterns {
let gr = pattern.parse()?;
@@ -290,7 +296,7 @@ impl RegexProcessor {
val: &str,
gr: &GroupRegex,
index: (usize, usize),
) -> Result<Vec<(usize, Value)>, String> {
) -> Result<Vec<(usize, Value)>> {
let mut result = Vec::new();
if let Some(captures) = gr.regex.captures(val) {
for (group_index, group) in gr.groups.iter().enumerate() {
@@ -316,7 +322,7 @@ impl Processor for RegexProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for (field_index, field) in self.fields.iter().enumerate() {
let index = field.input_index();
let mut result_list = None;
@@ -346,18 +352,19 @@ impl Processor for RegexProcessor {
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.input_name()
));
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
return ProcessorExpectStringSnafu {
processor: self.kind(),
v: v.clone(),
}
.fail();
}
}
// safety here

View File

@@ -18,7 +18,14 @@ use ahash::HashSet;
use chrono::{DateTime, NaiveDateTime};
use chrono_tz::Tz;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu,
ProcessorUnsupportedValueSnafu, Result,
};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor,
@@ -69,15 +76,15 @@ enum Resolution {
}
impl TryFrom<&str> for Resolution {
type Error = String;
type Error = Error;
fn try_from(s: &str) -> Result<Self, Self::Error> {
fn try_from(s: &str) -> Result<Self> {
match s {
SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
_ => Err(format!("invalid resolution: {s}")),
_ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
}
}
}
@@ -127,13 +134,13 @@ impl ProcessorBuilder for TimestampProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Timestamp)
}
}
impl TimestampProcessorBuilder {
pub fn build(self, intermediate_keys: &[String]) -> Result<TimestampProcessor, String> {
pub fn build(self, intermediate_keys: &[String]) -> Result<TimestampProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
@@ -169,29 +176,37 @@ pub struct TimestampProcessor {
impl TimestampProcessor {
/// try to parse val with timezone first, if failed, parse without timezone
fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64, String> {
fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?)
Ok(dt
.timestamp_nanos_opt()
.context(DateFailedToGetTimestampSnafu)?)
} else {
let dt = NaiveDateTime::parse_from_str(val, fmt)
.map_err(|e| e.to_string())?
.context(DateParseSnafu { value: val })?
.and_local_timezone(tz)
.single()
.ok_or("failed to get local timezone")?;
Ok(dt.timestamp_nanos_opt().ok_or("failed to get timestamp")?)
.context(DateFailedToGetLocalTimezoneSnafu)?;
Ok(dt
.timestamp_nanos_opt()
.context(DateFailedToGetTimestampSnafu)?)
}
}
fn parse_time_str(&self, val: &str) -> Result<i64, String> {
fn parse_time_str(&self, val: &str) -> Result<i64> {
for (fmt, tz) in self.formats.iter() {
if let Ok(ns) = Self::try_parse(val, fmt, *tz) {
return Ok(ns);
}
}
Err(format!("{} processor: failed to parse {val}", self.kind(),))
ProcessorFailedToParseStringSnafu {
kind: PROCESSOR_TIMESTAMP,
value: val.to_string(),
}
.fail()
}
fn parse(&self, val: &Value) -> Result<Timestamp, String> {
fn parse(&self, val: &Value) -> Result<Timestamp> {
let t: i64 = match val {
Value::String(s) => {
let t = s.parse::<i64>();
@@ -221,9 +236,11 @@ impl TimestampProcessor {
},
_ => {
return Err(format!(
"{PROCESSOR_TIMESTAMP} processor: unsupported value {val}"
))
return ProcessorUnsupportedValueSnafu {
processor: PROCESSOR_TIMESTAMP,
val: val.to_string(),
}
.fail();
}
};
@@ -236,7 +253,7 @@ impl TimestampProcessor {
}
}
fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>, String> {
fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {
return match yaml.as_vec() {
Some(formats_yaml) => {
let mut formats = Vec::with_capacity(formats_yaml.len());
@@ -244,32 +261,38 @@ fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>,
let s = yaml_strings(v, FORMATS_NAME)
.or(yaml_string(v, FORMATS_NAME).map(|s| vec![s]))?;
if s.len() != 1 && s.len() != 2 {
return Err(format!(
"{PROCESSOR_TIMESTAMP} processor: invalid format {s:?}"
));
return DateInvalidFormatSnafu {
processor: PROCESSOR_TIMESTAMP,
s: format!("{s:?}"),
}
.fail();
}
let mut iter = s.into_iter();
// safety: unwrap is safe here
let formatter = iter.next().unwrap();
let tz = iter
.next()
.map(|tz| tz.parse::<Tz>())
.unwrap_or(Ok(Tz::UTC))
.map_err(|e| e.to_string())?;
.map(|tz| {
tz.parse::<Tz>()
.context(DateParseTimezoneSnafu { value: tz })
})
.unwrap_or(Ok(Tz::UTC))?;
formats.push((Arc::new(formatter), tz));
}
Ok(formats)
}
None => Err(format!(
"{PROCESSOR_TIMESTAMP} processor: invalid format {yaml:?}"
)),
None => DateInvalidFormatSnafu {
processor: PROCESSOR_TIMESTAMP,
s: format!("{yaml:?}"),
}
.fail(),
};
}
impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut formats = Formats::default();
let mut resolution = Resolution::default();
@@ -278,7 +301,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessorBuilder {
for (k, v) in hash {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
@@ -321,17 +344,17 @@ impl Processor for TimestampProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input().index;
match val.get(index) {
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
&field.input().name
));
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {

View File

@@ -13,8 +13,13 @@
// limitations under the License.
use ahash::HashSet;
use snafu::{OptionExt, ResultExt};
use urlencoding::{decode, encode};
use crate::etl::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
UrlEncodingDecodeSnafu, UrlEncodingInvalidMethodSnafu,
};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, ProcessorBuilder, ProcessorKind,
@@ -41,13 +46,13 @@ impl std::fmt::Display for Method {
}
impl std::str::FromStr for Method {
type Err = String;
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
fn from_str(s: &str) -> Result<Self> {
match s {
"decode" => Ok(Method::Decode),
"encode" => Ok(Method::Encode),
_ => Err(format!("invalid method: {s}")),
_ => UrlEncodingInvalidMethodSnafu { s }.fail(),
}
}
}
@@ -71,14 +76,14 @@ impl ProcessorBuilder for UrlEncodingProcessorBuilder {
self.fields.iter().map(|f| f.input_field()).collect()
}
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind, String> {
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys)
.map(ProcessorKind::UrlEncoding)
}
}
impl UrlEncodingProcessorBuilder {
fn build(self, intermediate_keys: &[String]) -> Result<UrlEncodingProcessor, String> {
fn build(self, intermediate_keys: &[String]) -> Result<UrlEncodingProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
@@ -106,19 +111,19 @@ pub struct UrlEncodingProcessor {
}
impl UrlEncodingProcessor {
fn process_field(&self, val: &str) -> Result<Value, String> {
fn process_field(&self, val: &str) -> Result<Value> {
let processed = match self.method {
Method::Encode => encode(val).to_string(),
Method::Decode => decode(val).map_err(|e| e.to_string())?.into_owned(),
Method::Decode => decode(val).context(UrlEncodingDecodeSnafu)?.into_owned(),
};
Ok(Value::String(processed))
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessorBuilder {
type Error = String;
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut method = Method::Decode;
let mut ignore_missing = false;
@@ -126,7 +131,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessorBuilder {
for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
@@ -166,7 +171,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<(), String> {
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
@@ -177,18 +182,19 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
"{} processor: missing field: {}",
self.kind(),
field.output_name()
));
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {
return Err(format!(
"{} processor: expect string value, but got {v:?}",
self.kind()
));
return ProcessorExpectStringSnafu {
processor: self.kind(),
v: v.clone(),
}
.fail();
}
}
}

View File

@@ -16,7 +16,9 @@ pub mod index;
pub mod transformer;
use itertools::Itertools;
use snafu::OptionExt;
use crate::etl::error::{Error, Result};
use crate::etl::find_key_index;
use crate::etl::processor::yaml_string;
use crate::etl::transform::index::Index;
@@ -31,6 +33,10 @@ const TRANSFORM_ON_FAILURE: &str = "on_failure";
pub use transformer::greptime::GreptimeTransformer;
use super::error::{
KeyMustBeStringSnafu, TransformElementMustBeMapSnafu, TransformOnFailureInvalidValueSnafu,
TransformTypeMustBeSetSnafu,
};
use super::field::{Fields, InputFieldInfo, OneInputOneOutputField};
use super::processor::{yaml_new_field, yaml_new_fields};
@@ -38,11 +44,11 @@ pub trait Transformer: std::fmt::Display + Sized + Send + Sync + 'static {
type Output;
type VecOutput;
fn new(transforms: Transforms) -> Result<Self, String>;
fn new(transforms: Transforms) -> Result<Self>;
fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema>;
fn transforms(&self) -> &Transforms;
fn transforms_mut(&mut self) -> &mut Transforms;
fn transform_mut(&self, val: &mut Vec<Value>) -> Result<Self::VecOutput, String>;
fn transform_mut(&self, val: &mut Vec<Value>) -> Result<Self::VecOutput>;
}
/// On Failure behavior when transform fails
@@ -57,13 +63,13 @@ pub enum OnFailure {
}
impl std::str::FromStr for OnFailure {
type Err = String;
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
fn from_str(s: &str) -> Result<Self> {
match s {
"ignore" => Ok(OnFailure::Ignore),
"default" => Ok(OnFailure::Default),
_ => Err(format!("invalid transform on_failure value: {}", s)),
_ => TransformOnFailureInvalidValueSnafu { value: s }.fail(),
}
}
}
@@ -139,16 +145,16 @@ impl std::ops::DerefMut for Transforms {
}
impl TryFrom<&Vec<yaml_rust::Yaml>> for TransformBuilders {
type Error = String;
type Error = Error;
fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self, Self::Error> {
fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
let mut transforms = Vec::with_capacity(100);
let mut all_output_keys: Vec<String> = Vec::with_capacity(100);
let mut all_required_keys = Vec::with_capacity(100);
for doc in docs {
let transform_builder: TransformBuilder = doc
.as_hash()
.ok_or("transform element must be a map".to_string())?
.context(TransformElementMustBeMapSnafu)?
.try_into()?;
let mut transform_output_keys = transform_builder
.fields
@@ -187,11 +193,7 @@ pub struct TransformBuilder {
}
impl TransformBuilder {
pub fn build(
self,
intermediate_keys: &[String],
output_keys: &[String],
) -> Result<Transform, String> {
pub fn build(self, intermediate_keys: &[String], output_keys: &[String]) -> Result<Transform> {
let mut real_fields = vec![];
for field in self.fields {
let input_index = find_key_index(intermediate_keys, field.input_field(), "transform")?;
@@ -277,9 +279,9 @@ impl Transform {
}
impl TryFrom<&yaml_rust::yaml::Hash> for TransformBuilder {
type Error = String;
type Error = Error;
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut type_ = Value::Null;
let mut default = None;
@@ -287,7 +289,9 @@ impl TryFrom<&yaml_rust::yaml::Hash> for TransformBuilder {
let mut on_failure = None;
for (k, v) in hash {
let key = k.as_str().ok_or("key must be a string")?;
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
TRANSFORM_FIELD => {
fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?);
@@ -324,10 +328,11 @@ impl TryFrom<&yaml_rust::yaml::Hash> for TransformBuilder {
if let Some(default_value) = default {
match (&type_, &default_value) {
(Value::Null, _) => {
return Err(format!(
"transform {:?} type MUST BE set before default {}",
fields, &default_value,
));
return TransformTypeMustBeSetSnafu {
fields: format!("{:?}", fields),
default: default_value.to_string(),
}
.fail();
}
(_, Value::Null) => {} // if default is not set, then it will be regarded as default null
(_, _) => {

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::etl::error::{Error, Result, UnsupportedIndexTypeSnafu};
const INDEX_TIMESTAMP: &str = "timestamp";
const INDEX_TIMEINDEX: &str = "time";
const INDEX_TAG: &str = "tag";
@@ -38,22 +40,22 @@ impl std::fmt::Display for Index {
}
impl TryFrom<String> for Index {
type Error = String;
type Error = Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
fn try_from(value: String) -> Result<Self> {
Index::try_from(value.as_str())
}
}
impl TryFrom<&str> for Index {
type Error = String;
type Error = Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
fn try_from(value: &str) -> Result<Self> {
match value {
INDEX_TIMESTAMP | INDEX_TIMEINDEX => Ok(Index::Time),
INDEX_TAG => Ok(Index::Tag),
INDEX_FULLTEXT => Ok(Index::Fulltext),
_ => Err(format!("unsupported index type: {}", value)),
_ => UnsupportedIndexTypeSnafu { value }.fail(),
}
}
}

View File

@@ -20,6 +20,10 @@ use coerce::{coerce_columns, coerce_value};
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use crate::etl::error::{
Result, TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
};
use crate::etl::field::{InputFieldInfo, OneInputOneOutputField};
use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transformer, Transforms};
@@ -71,7 +75,7 @@ impl GreptimeTransformer {
}
/// Generate the schema for the GreptimeTransformer
fn schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>, String> {
fn schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
let mut schema = vec![];
for transform in transforms.iter() {
schema.extend(coerce_columns(transform)?);
@@ -90,9 +94,9 @@ impl Transformer for GreptimeTransformer {
type Output = Rows;
type VecOutput = Row;
fn new(mut transforms: Transforms) -> Result<Self, String> {
fn new(mut transforms: Transforms) -> Result<Self> {
if transforms.is_empty() {
return Err("transform cannot be empty".to_string());
return TransformEmptySnafu.fail();
}
let mut column_names_set = HashSet::new();
@@ -108,9 +112,7 @@ impl Transformer for GreptimeTransformer {
let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
if !intersections.is_empty() {
let duplicates = intersections.iter().join(",");
return Err(format!(
"column name must be unique, but got duplicated: {duplicates}"
));
return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
}
column_names_set.extend(target_fields_set);
@@ -121,10 +123,14 @@ impl Transformer for GreptimeTransformer {
1 => timestamp_columns
.push(transform.real_fields.first().unwrap().input_name()),
_ => {
return Err(format!(
"Illegal to set multiple timestamp Index columns, please set only one: {}",
transform.real_fields.iter().map(|x|x.input_name()).join(", ")
))
return TransformMultipleTimestampIndexSnafu {
columns: transform
.real_fields
.iter()
.map(|x| x.input_name())
.join(", "),
}
.fail();
}
}
}
@@ -145,14 +151,12 @@ impl Transformer for GreptimeTransformer {
_ => {
let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", ");
let count = timestamp_columns.len();
Err(
format!("transform must have exactly one field specified as timestamp Index, but got {count}: {columns}")
)
TransformTimestampIndexCountSnafu { count, columns }.fail()
}
}
}
fn transform_mut(&self, val: &mut Vec<Value>) -> Result<Self::VecOutput, String> {
fn transform_mut(&self, val: &mut Vec<Value>) -> Result<Self::VecOutput> {
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
for transform in self.transforms.iter() {
for field in transform.real_fields.iter() {
@@ -160,8 +164,7 @@ impl Transformer for GreptimeTransformer {
let output_index = field.output_index();
match val.get(index) {
Some(v) => {
let value_data = coerce_value(v, transform)
.map_err(|e| format!("{} processor: {}", field.input_name(), e))?;
let value_data = coerce_value(v, transform)?;
// every transform fields has only one output field
values[output_index] = GreptimeValue { value_data };
}

View File

@@ -17,17 +17,22 @@ use api::v1::ColumnOptions;
use datatypes::schema::FulltextOptions;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use snafu::ResultExt;
use crate::etl::error::{
CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,
};
use crate::etl::transform::index::Index;
use crate::etl::transform::{OnFailure, Transform};
use crate::etl::value::{Timestamp, Value};
impl TryFrom<Value> for ValueData {
type Error = String;
type Error = Error;
fn try_from(value: Value) -> Result<Self, Self::Error> {
fn try_from(value: Value) -> Result<Self> {
match value {
Value::Null => Err("Null type not supported".to_string()),
Value::Null => CoerceUnsupportedNullTypeSnafu.fail(),
Value::Int8(v) => Ok(ValueData::I32Value(v as i32)),
Value::Int16(v) => Ok(ValueData::I32Value(v as i32)),
@@ -63,7 +68,7 @@ impl TryFrom<Value> for ValueData {
}
// TODO(yuanbohan): add fulltext support in datatype_extension
pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>, String> {
pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>> {
let mut columns = Vec::new();
for field in transform.real_fields.iter() {
@@ -94,19 +99,19 @@ fn coerce_semantic_type(transform: &Transform) -> SemanticType {
}
}
fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>, String> {
fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
if let Some(Index::Fulltext) = transform.index {
options_from_fulltext(&FulltextOptions {
enable: true,
..Default::default()
})
.map_err(|e| e.to_string())
.context(ColumnOptionsSnafu)
} else {
Ok(None)
}
}
fn coerce_type(transform: &Transform) -> Result<ColumnDataType, String> {
fn coerce_type(transform: &Transform) -> Result<ColumnDataType> {
match transform.type_ {
Value::Int8(_) => Ok(ColumnDataType::Int8),
Value::Int16(_) => Ok(ColumnDataType::Int16),
@@ -132,17 +137,14 @@ fn coerce_type(transform: &Transform) -> Result<ColumnDataType, String> {
Value::Array(_) => unimplemented!("Array"),
Value::Map(_) => unimplemented!("Object"),
Value::Null => Err(format!(
"Null type not supported when to coerce '{}' type",
transform.type_.to_str_type()
)),
Value::Null => CoerceUnsupportedNullTypeToSnafu {
ty: transform.type_.to_str_type(),
}
.fail(),
}
}
pub(crate) fn coerce_value(
val: &Value,
transform: &Transform,
) -> Result<Option<ValueData>, String> {
pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result<Option<ValueData>> {
match val {
Value::Null => match &transform.default {
Some(default) => coerce_value(default, transform),
@@ -190,7 +192,7 @@ pub(crate) fn coerce_value(
}
}
fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>, String> {
fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(b as i32),
Value::Int16(_) => ValueData::I16Value(b as i32),
@@ -211,9 +213,11 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
}
None => {
return CoerceUnsupportedEpochTypeSnafu { ty: "Boolean" }.fail();
}
None => return Err("Boolean type not supported for Epoch".to_string()),
},
Value::Array(_) => unimplemented!("Array type not supported"),
@@ -225,7 +229,7 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>
Ok(Some(val))
}
fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>, String> {
fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(n as i32),
Value::Int16(_) => ValueData::I16Value(n as i32),
@@ -246,9 +250,11 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>,
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
}
None => {
return CoerceUnsupportedEpochTypeSnafu { ty: "Integer" }.fail();
}
None => return Err("Integer type not supported for Epoch".to_string()),
},
Value::Array(_) => unimplemented!("Array type not supported"),
@@ -260,7 +266,7 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>,
Ok(Some(val))
}
fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>, String> {
fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(n as i32),
Value::Int16(_) => ValueData::I16Value(n as i32),
@@ -281,9 +287,11 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>,
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
}
None => {
return CoerceUnsupportedEpochTypeSnafu { ty: "Integer" }.fail();
}
None => return Err("Integer type not supported for Epoch".to_string()),
},
Value::Array(_) => unimplemented!("Array type not supported"),
@@ -295,7 +303,7 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>,
Ok(Some(val))
}
fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>, String> {
fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(n as i32),
Value::Int16(_) => ValueData::I16Value(n as i32),
@@ -316,9 +324,11 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>,
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
}
None => {
return CoerceUnsupportedEpochTypeSnafu { ty: "Float" }.fail();
}
None => return Err("Float type not supported for Epoch".to_string()),
},
Value::Array(_) => unimplemented!("Array type not supported"),
@@ -340,17 +350,17 @@ macro_rules! coerce_string_value {
Some(default) => coerce_value(default, $transform),
None => coerce_value($transform.get_type_matched_default_val(), $transform),
},
None => Err(format!(
"failed to coerce string value '{}' to type '{}'",
$s,
$transform.type_.to_str_type()
)),
None => CoerceStringToTypeSnafu {
s: $s,
ty: $transform.type_.to_str_type(),
}
.fail(),
},
}
};
}
fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<ValueData>, String> {
fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<ValueData>> {
match transform.type_ {
Value::Int8(_) => {
coerce_string_value!(s, transform, i32, I8Value)
@@ -393,8 +403,8 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<Value
Value::Timestamp(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => Err("default value not supported for Epoch".to_string()),
None => Err("String type not supported for Epoch".to_string()),
Some(OnFailure::Default) => CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(),
None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(),
},
Value::Array(_) => unimplemented!("Array type not supported"),

View File

@@ -19,8 +19,16 @@ pub mod time;
use ahash::{HashMap, HashMapExt};
pub use array::Array;
pub use map::Map;
use snafu::{OptionExt, ResultExt};
pub use time::Timestamp;
use super::error::{
ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu, ValueParseBooleanSnafu,
ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedNumberTypeSnafu,
ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
};
use crate::etl::error::{Error, Result};
/// Value can be used as type
/// acts as value: the enclosed value is the actual value
/// acts as type: the enclosed value is the default value
@@ -58,7 +66,7 @@ impl Value {
matches!(self, Value::Null)
}
pub fn parse_str_type(t: &str) -> Result<Self, String> {
pub fn parse_str_type(t: &str) -> Result<Self> {
let mut parts = t.splitn(2, ',');
let head = parts.next().unwrap_or_default();
let tail = parts.next().map(|s| s.trim().to_string());
@@ -93,10 +101,11 @@ impl Value {
time::SECOND_RESOLUTION | time::SEC_RESOLUTION | time::S_RESOLUTION => {
Ok(Value::Timestamp(Timestamp::Second(0)))
}
_ => Err(format!(
"invalid resolution: '{resolution}'. Available resolutions: {}",
time::VALID_RESOLUTIONS.join(",")
)),
_ => ValueInvalidResolutionSnafu {
resolution,
valid_resolution: time::VALID_RESOLUTIONS.join(","),
}
.fail(),
},
_ => Ok(Value::Timestamp(Timestamp::Nanosecond(0))),
},
@@ -104,65 +113,68 @@ impl Value {
"array" => Ok(Value::Array(Array::default())),
"map" => Ok(Value::Map(Map::default())),
_ => Err(format!("failed to parse type: '{t}'")),
_ => ValueParseTypeSnafu { t }.fail(),
}
}
/// only support string, bool, number, null
pub fn parse_str_value(&self, v: &str) -> Result<Self, String> {
pub fn parse_str_value(&self, v: &str) -> Result<Self> {
match self {
Value::Int8(_) => v
.parse::<i8>()
.map(Value::Int8)
.map_err(|e| format!("failed to parse int8: {}", e)),
.context(ValueParseIntSnafu { ty: "int8", v }),
Value::Int16(_) => v
.parse::<i16>()
.map(Value::Int16)
.map_err(|e| format!("failed to parse int16: {}", e)),
.context(ValueParseIntSnafu { ty: "int16", v }),
Value::Int32(_) => v
.parse::<i32>()
.map(Value::Int32)
.map_err(|e| format!("failed to parse int32: {}", e)),
.context(ValueParseIntSnafu { ty: "int32", v }),
Value::Int64(_) => v
.parse::<i64>()
.map(Value::Int64)
.map_err(|e| format!("failed to parse int64: {}", e)),
.context(ValueParseIntSnafu { ty: "int64", v }),
Value::Uint8(_) => v
.parse::<u8>()
.map(Value::Uint8)
.map_err(|e| format!("failed to parse uint8: {}", e)),
.context(ValueParseIntSnafu { ty: "uint8", v }),
Value::Uint16(_) => v
.parse::<u16>()
.map(Value::Uint16)
.map_err(|e| format!("failed to parse uint16: {}", e)),
.context(ValueParseIntSnafu { ty: "uint16", v }),
Value::Uint32(_) => v
.parse::<u32>()
.map(Value::Uint32)
.map_err(|e| format!("failed to parse uint32: {}", e)),
.context(ValueParseIntSnafu { ty: "uint32", v }),
Value::Uint64(_) => v
.parse::<u64>()
.map(Value::Uint64)
.map_err(|e| format!("failed to parse uint64: {}", e)),
.context(ValueParseIntSnafu { ty: "uint64", v }),
Value::Float32(_) => v
.parse::<f32>()
.map(Value::Float32)
.map_err(|e| format!("failed to parse float32: {}", e)),
.context(ValueParseFloatSnafu { ty: "float32", v }),
Value::Float64(_) => v
.parse::<f64>()
.map(Value::Float64)
.map_err(|e| format!("failed to parse float64: {}", e)),
.context(ValueParseFloatSnafu { ty: "float64", v }),
Value::Boolean(_) => v
.parse::<bool>()
.map(Value::Boolean)
.map_err(|e| format!("failed to parse bool: {}", e)),
.context(ValueParseBooleanSnafu { ty: "boolean", v }),
Value::String(_) => Ok(Value::String(v.to_string())),
Value::Null => Ok(Value::Null),
_ => Err(format!("default value not unsupported for type {}", self)),
_ => ValueDefaultValueUnsupportedSnafu {
value: format!("{:?}", self),
}
.fail(),
}
}
@@ -249,9 +261,9 @@ impl std::fmt::Display for Value {
}
impl TryFrom<serde_json::Value> for Value {
type Error = String;
type Error = Error;
fn try_from(v: serde_json::Value) -> Result<Self, Self::Error> {
fn try_from(v: serde_json::Value) -> Result<Self> {
match v {
serde_json::Value::Null => Ok(Value::Null),
serde_json::Value::Bool(v) => Ok(Value::Boolean(v)),
@@ -263,7 +275,7 @@ impl TryFrom<serde_json::Value> for Value {
} else if let Some(v) = v.as_f64() {
Ok(Value::Float64(v))
} else {
Err(format!("unsupported number type: {}", v))
ValueUnsupportedNumberTypeSnafu { value: v }.fail()
}
}
serde_json::Value::String(v) => Ok(Value::String(v)),
@@ -286,20 +298,17 @@ impl TryFrom<serde_json::Value> for Value {
}
impl TryFrom<&yaml_rust::Yaml> for Value {
type Error = String;
type Error = Error;
fn try_from(v: &yaml_rust::Yaml) -> Result<Self, Self::Error> {
fn try_from(v: &yaml_rust::Yaml) -> Result<Self> {
match v {
yaml_rust::Yaml::Null => Ok(Value::Null),
yaml_rust::Yaml::Boolean(v) => Ok(Value::Boolean(*v)),
yaml_rust::Yaml::Integer(v) => Ok(Value::Int64(*v)),
yaml_rust::Yaml::Real(v) => {
if let Ok(v) = v.parse() {
Ok(Value::Float64(v))
} else {
Err(format!("failed to parse float64: {}", v))
}
}
yaml_rust::Yaml::Real(v) => match v.parse::<f64>() {
Ok(v) => Ok(Value::Float64(v)),
Err(e) => Err(e).context(ValueParseFloatSnafu { ty: "float64", v }),
},
yaml_rust::Yaml::String(v) => Ok(Value::String(v.to_string())),
yaml_rust::Yaml::Array(arr) => {
let mut values = vec![];
@@ -313,12 +322,12 @@ impl TryFrom<&yaml_rust::Yaml> for Value {
for (k, v) in v {
let key = k
.as_str()
.ok_or(format!("key in Hash must be a string, but got {v:?}"))?;
.with_context(|| ValueYamlKeyMustBeStringSnafu { value: v.clone() })?;
values.insert(key.to_string(), Value::try_from(v)?);
}
Ok(Value::Map(Map { values }))
}
_ => Err(format!("unsupported yaml type: {v:?}")),
_ => ValueUnsupportedYamlTypeSnafu { value: v.clone() }.fail(),
}
}
}

View File

@@ -16,6 +16,7 @@ mod etl;
mod manager;
mod metrics;
pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};

View File

@@ -32,14 +32,16 @@ pub enum Error {
#[snafu(display("Failed to insert pipeline to pipelines table"))]
InsertPipeline {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse pipeline: {}", reason))]
#[snafu(display("Failed to parse pipeline"))]
CompilePipeline {
reason: String,
#[snafu(source)]
source: crate::etl::error::Error,
#[snafu(implicit)]
location: Location,
},
@@ -56,6 +58,7 @@ pub enum Error {
CollectRecords {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: common_recordbatch::error::Error,
},
@@ -76,6 +79,7 @@ pub enum Error {
#[snafu(display("Failed to execute internal statement"))]
ExecuteInternalStatement {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
@@ -83,6 +87,7 @@ pub enum Error {
#[snafu(display("Failed to create dataframe"))]
DataFrame {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
@@ -90,6 +95,7 @@ pub enum Error {
#[snafu(display("General catalog error"))]
Catalog {
#[snafu(source)]
source: catalog::error::Error,
#[snafu(implicit)]
location: Location,
@@ -97,14 +103,16 @@ pub enum Error {
#[snafu(display("Failed to create table"))]
CreateTable {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute pipeline, reason: {}", reason))]
#[snafu(display("Failed to execute pipeline"))]
PipelineTransform {
reason: String,
#[snafu(source)]
source: crate::etl::error::Error,
#[snafu(implicit)]
location: Location,
},

View File

@@ -204,8 +204,7 @@ impl PipelineTable {
/// Compile a pipeline from a string.
pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
let yaml_content = Content::Yaml(pipeline.into());
parse::<GreptimeTransformer>(&yaml_content)
.map_err(|e| CompilePipelineSnafu { reason: e }.build())
parse::<GreptimeTransformer>(&yaml_content).context(CompilePipelineSnafu)
}
/// Insert a pipeline into the pipeline table.

View File

@@ -279,5 +279,5 @@ transform:
let row = pipeline.exec_mut(&mut result);
assert!(row.is_err());
assert_eq!(row.err().unwrap(), "No matching pattern found");
assert_eq!(row.err().unwrap().to_string(), "No matching pattern found");
}

View File

@@ -115,19 +115,18 @@ pub async fn add_pipeline(
) -> Result<GreptimedbManageResponse> {
let start = Instant::now();
let handler = state.log_handler;
if pipeline_name.is_empty() {
return Err(InvalidParameterSnafu {
ensure!(
!pipeline_name.is_empty(),
InvalidParameterSnafu {
reason: "pipeline_name is required in path",
}
.build());
}
if payload.is_empty() {
return Err(InvalidParameterSnafu {
);
ensure!(
!payload.is_empty(),
InvalidParameterSnafu {
reason: "pipeline is required in body",
}
.build());
}
);
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
@@ -252,12 +251,12 @@ pub async fn pipeline_dryrun(
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
if value.len() > 10 {
return Err(InvalidParameterSnafu {
ensure!(
value.len() <= 10,
InvalidParameterSnafu {
reason: "too many rows for dryrun",
}
.build());
}
);
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
@@ -272,11 +271,11 @@ pub async fn pipeline_dryrun(
for v in value {
pipeline
.prepare(v, &mut intermediate_state)
.map_err(|reason| PipelineTransformSnafu { reason }.build())
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
let r = pipeline
.exec_mut(&mut intermediate_state)
.map_err(|reason| PipelineTransformSnafu { reason }.build())
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
results.push(r);
pipeline.reset_intermediate_state(&mut intermediate_state);
@@ -438,21 +437,21 @@ async fn ingest_logs_inner(
for v in pipeline_data {
pipeline
.prepare(v, &mut intermediate_state)
.map_err(|reason| {
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
PipelineTransformSnafu { reason }.build()
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
let r = pipeline
.exec_mut(&mut intermediate_state)
.map_err(|reason| {
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
PipelineTransformSnafu { reason }.build()
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
results.push(r);
pipeline.reset_intermediate_state(&mut intermediate_state);