refactor: use global type alias for pipeline input (#5568)

* refactor: use global type alias for pipeline input

* fmt: reformat
This commit is contained in:
Ning Sun
2025-02-19 02:41:33 -08:00
committed by GitHub
parent 7c65fddb30
commit 81da18e5df
23 changed files with 81 additions and 129 deletions

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use common_telemetry::debug;
use snafu::OptionExt;
use yaml_rust::Yaml;
@@ -22,7 +20,7 @@ use crate::etl::error::{
Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu,
ValueRequiredForDispatcherRuleSnafu,
};
use crate::Value;
use crate::{PipelineMap, Value};
const FIELD: &str = "field";
const TABLE_SUFFIX: &str = "table_suffix";
@@ -111,7 +109,7 @@ impl TryFrom<&Yaml> for Dispatcher {
impl Dispatcher {
/// execute dispatcher and returns matched rule if any
pub(crate) fn exec(&self, data: &BTreeMap<String, Value>) -> Option<&Rule> {
pub(crate) fn exec(&self, data: &PipelineMap) -> Option<&Rule> {
if let Some(value) = data.get(&self.field) {
for rule in &self.rules {
if rule.value == *value {

View File

@@ -20,14 +20,13 @@ pub mod processor;
pub mod transform;
pub mod value;
use std::collections::BTreeMap;
use std::sync::Arc;
use error::{
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu,
};
use itertools::Itertools;
use processor::{IntermediateStatus, Processor, Processors};
use processor::{Processor, Processors};
use snafu::{ensure, OptionExt, ResultExt};
use transform::{Transformer, Transforms};
use value::Value;
@@ -43,6 +42,8 @@ const TRANSFORM: &str = "transform";
const TRANSFORMS: &str = "transforms";
const DISPATCHER: &str = "dispatcher";
pub type PipelineMap = std::collections::BTreeMap<String, Value>;
pub enum Content<'a> {
Json(&'a str),
Yaml(&'a str),
@@ -153,10 +154,10 @@ impl<O> PipelineExecOutput<O> {
}
}
pub fn json_to_intermediate_state(val: serde_json::Value) -> Result<IntermediateStatus> {
pub fn json_to_intermediate_state(val: serde_json::Value) -> Result<PipelineMap> {
match val {
serde_json::Value::Object(map) => {
let mut intermediate_state = BTreeMap::new();
let mut intermediate_state = PipelineMap::new();
for (k, v) in map {
intermediate_state.insert(k, Value::try_from(v)?);
}
@@ -166,9 +167,7 @@ pub fn json_to_intermediate_state(val: serde_json::Value) -> Result<Intermediate
}
}
pub fn json_array_to_intermediate_state(
val: Vec<serde_json::Value>,
) -> Result<Vec<IntermediateStatus>> {
pub fn json_array_to_intermediate_state(val: Vec<serde_json::Value>) -> Result<Vec<PipelineMap>> {
val.into_iter().map(json_to_intermediate_state).collect()
}
@@ -176,10 +175,7 @@ impl<T> Pipeline<T>
where
T: Transformer,
{
pub fn exec_mut(
&self,
val: &mut BTreeMap<String, Value>,
) -> Result<PipelineExecOutput<T::VecOutput>> {
pub fn exec_mut(&self, val: &mut PipelineMap) -> Result<PipelineExecOutput<T::VecOutput>> {
for processor in self.processors.iter() {
processor.exec_mut(val)?;
}
@@ -350,7 +346,7 @@ transform:
type: timestamp, ns
index: time"#;
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_str)).unwrap();
let mut payload = BTreeMap::new();
let mut payload = PipelineMap::new();
payload.insert("message".to_string(), Value::String(message));
let result = pipeline
.exec_mut(&mut payload)

View File

@@ -27,8 +27,6 @@ pub mod regex;
pub mod timestamp;
pub mod urlencoding;
use std::collections::BTreeMap;
use cmcd::CmcdProcessor;
use csv::CsvProcessor;
use date::DateProcessor;
@@ -51,8 +49,8 @@ use super::error::{
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
};
use super::field::{Field, Fields};
use super::PipelineMap;
use crate::etl::error::{Error, Result};
use crate::etl::value::Value;
use crate::etl_error::UnsupportedProcessorSnafu;
const FIELD_NAME: &str = "field";
@@ -66,8 +64,6 @@ const TARGET_FIELDS_NAME: &str = "target_fields";
const JSON_PATH_NAME: &str = "json_path";
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
pub type IntermediateStatus = BTreeMap<String, Value>;
/// Processor trait defines the interface for all processors.
///
/// A processor is a transformation that can be applied to a field in a document
@@ -83,7 +79,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
fn ignore_missing(&self) -> bool;
/// Execute the processor on a vector which be preprocessed by the pipeline
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()>;
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()>;
}
#[derive(Debug)]

View File

@@ -16,12 +16,9 @@
//!
//! Refer to [`CmcdProcessor`] for more information.
use std::collections::BTreeMap;
use snafu::{OptionExt, ResultExt};
use urlencoding::decode;
use super::IntermediateStatus;
use crate::etl::error::{
CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
@@ -33,6 +30,7 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
@@ -161,8 +159,8 @@ impl CmcdProcessor {
format!("{}_{}", prefix, key)
}
fn parse(&self, name: &str, value: &str) -> Result<BTreeMap<String, Value>> {
let mut working_set = BTreeMap::new();
fn parse(&self, name: &str, value: &str) -> Result<PipelineMap> {
let mut working_set = PipelineMap::new();
let parts = value.split(',');
@@ -251,7 +249,7 @@ impl Processor for CmcdProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let name = field.input_field();
@@ -285,11 +283,9 @@ impl Processor for CmcdProcessor {
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use urlencoding::decode;
use super::CmcdProcessor;
use super::*;
use crate::etl::field::{Field, Fields};
use crate::etl::value::Value;
@@ -436,7 +432,7 @@ mod tests {
let expected = vec
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<BTreeMap<String, Value>>();
.collect::<PipelineMap>();
let actual = processor.parse("prefix", &decoded).unwrap();
assert_eq!(actual, expected);

View File

@@ -14,8 +14,6 @@
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html
use std::collections::BTreeMap;
use csv::{ReaderBuilder, Trim};
use itertools::EitherOrBoth::{Both, Left, Right};
use itertools::Itertools;
@@ -31,6 +29,7 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_CSV: &str = "csv";
@@ -60,7 +59,7 @@ pub struct CsvProcessor {
impl CsvProcessor {
// process the csv format string to a map with target_fields as keys
fn process(&self, val: &str) -> Result<BTreeMap<String, Value>> {
fn process(&self, val: &str) -> Result<PipelineMap> {
let mut reader = self.reader.from_reader(val.as_bytes());
if let Some(result) = reader.records().next() {
@@ -190,7 +189,7 @@ impl Processor for CsvProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut BTreeMap<String, Value>) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let name = field.input_field();
@@ -240,7 +239,7 @@ mod tests {
let result = processor.process("1,2").unwrap();
let values = [
let values: PipelineMap = [
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
]
@@ -266,7 +265,7 @@ mod tests {
let result = processor.process("1,2").unwrap();
let values = [
let values: PipelineMap = [
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
("c".into(), Value::Null),
@@ -291,7 +290,7 @@ mod tests {
let result = processor.process("1,2").unwrap();
let values = [
let values: PipelineMap = [
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
("c".into(), Value::String("default".into())),
@@ -317,7 +316,7 @@ mod tests {
let result = processor.process("1,2").unwrap();
let values = [
let values: PipelineMap = [
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
]

View File

@@ -19,7 +19,6 @@ use chrono_tz::Tz;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use super::IntermediateStatus;
use crate::etl::error::{
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
@@ -31,6 +30,7 @@ use crate::etl::processor::{
FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_DATE: &str = "date";
@@ -194,7 +194,7 @@ impl Processor for DateProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -22,7 +22,6 @@ use once_cell::sync::Lazy;
use regex::Regex;
use snafu::OptionExt;
use super::IntermediateStatus;
use crate::etl::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};
@@ -31,6 +30,7 @@ use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize";
@@ -102,7 +102,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -24,7 +24,6 @@ use std::borrow::Cow;
use regex::Regex;
use snafu::OptionExt;
use super::IntermediateStatus;
use crate::etl::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};
@@ -33,6 +32,7 @@ use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
use crate::etl_error::DigestPatternInvalidSnafu;
pub(crate) const PROCESSOR_DIGEST: &str = "digest";
@@ -201,7 +201,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -18,7 +18,6 @@ use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use itertools::Itertools;
use snafu::OptionExt;
use super::IntermediateStatus;
use crate::etl::error::{
DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu,
DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu,
@@ -32,6 +31,7 @@ use crate::etl::processor::{
Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_DISSECT: &str = "dissect";
@@ -601,7 +601,7 @@ impl Processor for DissectProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -14,7 +14,6 @@
use snafu::{OptionExt, ResultExt};
use super::IntermediateStatus;
use crate::etl::error::{
EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
@@ -30,6 +29,7 @@ use crate::etl::value::time::{
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
const RESOLUTION_NAME: &str = "resolution";
@@ -163,7 +163,7 @@ impl Processor for EpochProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -15,7 +15,6 @@
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use super::IntermediateStatus;
use crate::etl::error::{
Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
@@ -26,6 +25,7 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME, PATTERN_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_GSUB: &str = "gsub";
@@ -118,7 +118,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -14,7 +14,6 @@
use snafu::OptionExt;
use super::IntermediateStatus;
use crate::etl::error::{
Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
@@ -25,6 +24,7 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME, SEPARATOR_NAME,
};
use crate::etl::value::{Array, Value};
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_JOIN: &str = "join";
@@ -95,7 +95,7 @@ impl Processor for JoinProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -16,8 +16,8 @@ use jsonpath_rust::JsonPath;
use snafu::{OptionExt, ResultExt};
use super::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, IntermediateStatus, Processor,
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
};
use crate::etl::error::{Error, Result};
use crate::etl::field::Fields;
@@ -126,7 +126,7 @@ impl Processor for JsonPathProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -14,7 +14,6 @@
use snafu::OptionExt;
use super::IntermediateStatus;
use crate::etl::error::{
Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
@@ -25,6 +24,7 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME, METHOD_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_LETTER: &str = "letter";
@@ -126,7 +126,7 @@ impl Processor for LetterProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -18,13 +18,10 @@ const PATTERNS_NAME: &str = "patterns";
pub(crate) const PROCESSOR_REGEX: &str = "regex";
use std::collections::BTreeMap;
use lazy_static::lazy_static;
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use super::IntermediateStatus;
use crate::etl::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu,
RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu,
@@ -36,6 +33,7 @@ use crate::etl::processor::{
FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
lazy_static! {
static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap();
@@ -169,8 +167,8 @@ impl RegexProcessor {
Ok(())
}
fn process(&self, prefix: &str, val: &str) -> Result<BTreeMap<String, Value>> {
let mut result = BTreeMap::new();
fn process(&self, prefix: &str, val: &str) -> Result<PipelineMap> {
let mut result = PipelineMap::new();
for gr in self.patterns.iter() {
if let Some(captures) = gr.regex.captures(val) {
for group in gr.groups.iter() {
@@ -194,7 +192,7 @@ impl Processor for RegexProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
let prefix = field.target_or_input_field();
@@ -227,11 +225,10 @@ impl Processor for RegexProcessor {
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use ahash::{HashMap, HashMapExt};
use itertools::Itertools;
use super::*;
use crate::etl::processor::regex::RegexProcessor;
use crate::etl::value::{Map, Value};
@@ -272,7 +269,7 @@ ignore_missing: false"#;
let cw = "[c=w,n=US_CA_SANJOSE,o=55155]";
let breadcrumbs_str = [cc, cg, co, cp, cw].iter().join(",");
let temporary_map: BTreeMap<String, Value> = [
let temporary_map: PipelineMap = [
("breadcrumbs_parent", Value::String(cc.to_string())),
("breadcrumbs_edge", Value::String(cg.to_string())),
("breadcrumbs_origin", Value::String(co.to_string())),

View File

@@ -19,7 +19,6 @@ use chrono_tz::Tz;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use super::IntermediateStatus;
use crate::etl::error::{
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
@@ -37,6 +36,7 @@ use crate::etl::value::time::{
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp";
const RESOLUTION_NAME: &str = "resolution";
@@ -298,7 +298,7 @@ impl Processor for TimestampProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use snafu::{OptionExt, ResultExt};
use urlencoding::{decode, encode};
@@ -27,6 +25,7 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME, METHOD_NAME,
};
use crate::etl::value::Value;
use crate::PipelineMap;
pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding";
@@ -127,7 +126,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
self.ignore_missing
}
fn exec_mut(&self, val: &mut BTreeMap<String, Value>) -> Result<()> {
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {

View File

@@ -15,8 +15,6 @@
pub mod index;
pub mod transformer;
use std::collections::BTreeMap;
use snafu::OptionExt;
use crate::etl::error::{Error, Result};
@@ -39,6 +37,7 @@ use super::error::{
use super::field::Fields;
use super::processor::{yaml_new_field, yaml_new_fields, yaml_string};
use super::value::Timestamp;
use super::PipelineMap;
pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static {
type Output;
@@ -48,7 +47,7 @@ pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static {
fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema>;
fn transforms(&self) -> &Transforms;
fn transforms_mut(&mut self) -> &mut Transforms;
fn transform_mut(&self, val: &mut BTreeMap<String, Value>) -> Result<Self::VecOutput>;
fn transform_mut(&self, val: &mut PipelineMap) -> Result<Self::VecOutput>;
}
/// On Failure behavior when transform fails

View File

@@ -14,7 +14,7 @@
pub mod coerce;
use std::collections::{BTreeMap, HashSet};
use std::collections::HashSet;
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
@@ -34,10 +34,10 @@ use crate::etl::error::{
UnsupportedNumberTypeSnafu,
};
use crate::etl::field::{Field, Fields};
use crate::etl::processor::IntermediateStatus;
use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transformer, Transforms};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
@@ -178,7 +178,7 @@ impl Transformer for GreptimeTransformer {
}
}
fn transform_mut(&self, val: &mut IntermediateStatus) -> Result<Self::VecOutput> {
fn transform_mut(&self, val: &mut PipelineMap) -> Result<Self::VecOutput> {
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
let mut output_index = 0;
for transform in self.transforms.iter() {
@@ -327,7 +327,7 @@ fn resolve_number_schema(
)
}
fn values_to_row(schema_info: &mut SchemaInfo, values: BTreeMap<String, Value>) -> Result<Row> {
fn values_to_row(schema_info: &mut SchemaInfo, values: PipelineMap) -> Result<Row> {
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
for _ in 0..schema_info.schema.len() {
row.push(GreptimeValue { value_data: None });
@@ -513,7 +513,7 @@ fn values_to_row(schema_info: &mut SchemaInfo, values: BTreeMap<String, Value>)
}
fn identity_pipeline_inner<'a>(
array: Vec<BTreeMap<String, Value>>,
array: Vec<PipelineMap>,
tag_column_names: Option<impl Iterator<Item = &'a String>>,
_params: &GreptimePipelineParams,
) -> Result<Rows> {
@@ -569,7 +569,7 @@ fn identity_pipeline_inner<'a>(
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(
array: Vec<BTreeMap<String, Value>>,
array: Vec<PipelineMap>,
table: Option<Arc<table::Table>>,
params: &GreptimePipelineParams,
) -> Result<Rows> {
@@ -577,7 +577,7 @@ pub fn identity_pipeline(
array
.into_iter()
.map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING))
.collect::<Result<Vec<BTreeMap<String, Value>>>>()?
.collect::<Result<Vec<PipelineMap>>>()?
} else {
array
};
@@ -596,11 +596,8 @@ pub fn identity_pipeline(
///
/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
pub fn flatten_object(
object: BTreeMap<String, Value>,
max_nested_levels: usize,
) -> Result<BTreeMap<String, Value>> {
let mut flattened = BTreeMap::new();
pub fn flatten_object(object: PipelineMap, max_nested_levels: usize) -> Result<PipelineMap> {
let mut flattened = PipelineMap::new();
if !object.is_empty() {
// it will use recursion to flatten the object.
@@ -611,9 +608,9 @@ pub fn flatten_object(
}
fn do_flatten_object(
dest: &mut BTreeMap<String, Value>,
dest: &mut PipelineMap,
base: Option<&str>,
object: BTreeMap<String, Value>,
object: PipelineMap,
current_level: usize,
max_nested_levels: usize,
) -> Result<()> {

View File

@@ -16,8 +16,6 @@ pub mod array;
pub mod map;
pub mod time;
use std::collections::BTreeMap;
pub use array::Array;
use jsonb::{Number as JsonbNumber, Object as JsonbObject, Value as JsonbValue};
use jsonpath_rust::path::{JsonLike, Path};
@@ -32,6 +30,7 @@ use super::error::{
ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedNumberTypeSnafu,
ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
};
use super::PipelineMap;
use crate::etl::error::{Error, Result};
/// Value can be used as type
@@ -347,7 +346,7 @@ impl TryFrom<serde_json::Value> for Value {
Ok(Value::Array(Array { values }))
}
serde_json::Value::Object(v) => {
let mut values = BTreeMap::new();
let mut values = PipelineMap::new();
for (k, v) in v {
values.insert(k, Value::try_from(v)?);
}
@@ -378,7 +377,7 @@ impl TryFrom<&yaml_rust::Yaml> for Value {
Ok(Value::Array(Array { values }))
}
yaml_rust::Yaml::Hash(v) => {
let mut values = BTreeMap::new();
let mut values = PipelineMap::new();
for (k, v) in v {
let key = k
.as_str()
@@ -458,7 +457,7 @@ impl From<Value> for JsonbValue<'_> {
}
Value::Map(obj) => {
let mut map = JsonbObject::new();
for (k, v) in obj.into_iter() {
for (k, v) in obj.values.into_iter() {
let val: JsonbValue = v.into();
map.insert(k, val);
}

View File

@@ -12,15 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use ahash::HashMap;
use crate::etl::value::Value;
use crate::PipelineMap;
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Map {
pub values: BTreeMap<String, Value>,
pub values: PipelineMap,
}
impl Map {
@@ -39,24 +36,14 @@ impl Map {
}
}
impl From<HashMap<String, Value>> for Map {
fn from(values: HashMap<String, Value>) -> Self {
let mut map = Map::default();
for (k, v) in values.into_iter() {
map.insert(k, v);
}
map
}
}
impl From<BTreeMap<String, Value>> for Map {
fn from(values: BTreeMap<String, Value>) -> Self {
impl From<PipelineMap> for Map {
fn from(values: PipelineMap) -> Self {
Self { values }
}
}
impl std::ops::Deref for Map {
type Target = BTreeMap<String, Value>;
type Target = PipelineMap;
fn deref(&self) -> &Self::Target {
&self.values
@@ -69,16 +56,6 @@ impl std::ops::DerefMut for Map {
}
}
impl std::iter::IntoIterator for Map {
type Item = (String, Value);
type IntoIter = std::collections::btree_map::IntoIter<String, Value>;
fn into_iter(self) -> Self::IntoIter {
self.values.into_iter()
}
}
impl std::fmt::Display for Map {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let values = self

View File

@@ -25,8 +25,8 @@ pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{
error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse,
Content, DispatchedTo, Pipeline, PipelineDefinition, PipelineExecOutput, PipelineWay,
SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
Content, DispatchedTo, Pipeline, PipelineDefinition, PipelineExecOutput, PipelineMap,
PipelineWay, SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};
pub use manager::{
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use api::v1::{RowInsertRequest, Rows};
use pipeline::{
DispatchedTo, GreptimePipelineParams, GreptimeTransformer, Pipeline, PipelineDefinition,
PipelineExecOutput, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -52,7 +52,7 @@ pub(crate) async fn run_pipeline(
state: &PipelineHandlerRef,
pipeline_definition: PipelineDefinition,
pipeline_parameters: &GreptimePipelineParams,
array: Vec<BTreeMap<String, pipeline::Value>>,
array: Vec<PipelineMap>,
table_name: String,
query_ctx: &QueryContextRef,
is_top_level: bool,
@@ -81,8 +81,7 @@ pub(crate) async fn run_pipeline(
let transform_timer = std::time::Instant::now();
let mut transformed = Vec::with_capacity(array.len());
let mut dispatched: BTreeMap<DispatchedTo, Vec<BTreeMap<String, pipeline::Value>>> =
BTreeMap::new();
let mut dispatched: BTreeMap<DispatchedTo, Vec<PipelineMap>> = BTreeMap::new();
for mut values in array {
let r = pipeline