From 81da18e5dfcf84a2479ef21281c10911ca6d463a Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 19 Feb 2025 02:41:33 -0800 Subject: [PATCH] refactor: use global type alias for pipeline input (#5568) * refactor: use global type alias for pipeline input * fmt: reformat --- src/pipeline/src/dispatcher.rs | 6 ++-- src/pipeline/src/etl.rs | 20 +++++------ src/pipeline/src/etl/processor.rs | 8 ++--- src/pipeline/src/etl/processor/cmcd.rs | 16 ++++----- src/pipeline/src/etl/processor/csv.rs | 15 ++++----- src/pipeline/src/etl/processor/date.rs | 4 +-- src/pipeline/src/etl/processor/decolorize.rs | 4 +-- src/pipeline/src/etl/processor/digest.rs | 4 +-- src/pipeline/src/etl/processor/dissect.rs | 4 +-- src/pipeline/src/etl/processor/epoch.rs | 4 +-- src/pipeline/src/etl/processor/gsub.rs | 4 +-- src/pipeline/src/etl/processor/join.rs | 4 +-- src/pipeline/src/etl/processor/json_path.rs | 6 ++-- src/pipeline/src/etl/processor/letter.rs | 4 +-- src/pipeline/src/etl/processor/regex.rs | 15 ++++----- src/pipeline/src/etl/processor/timestamp.rs | 4 +-- src/pipeline/src/etl/processor/urlencoding.rs | 5 ++- src/pipeline/src/etl/transform.rs | 5 ++- .../src/etl/transform/transformer/greptime.rs | 25 +++++++------- src/pipeline/src/etl/value.rs | 9 +++-- src/pipeline/src/etl/value/map.rs | 33 +++---------------- src/pipeline/src/lib.rs | 4 +-- src/servers/src/pipeline.rs | 7 ++-- 23 files changed, 81 insertions(+), 129 deletions(-) diff --git a/src/pipeline/src/dispatcher.rs b/src/pipeline/src/dispatcher.rs index a1c208e850..909b1afa42 100644 --- a/src/pipeline/src/dispatcher.rs +++ b/src/pipeline/src/dispatcher.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - use common_telemetry::debug; use snafu::OptionExt; use yaml_rust::Yaml; @@ -22,7 +20,7 @@ use crate::etl::error::{ Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu, ValueRequiredForDispatcherRuleSnafu, }; -use crate::Value; +use crate::{PipelineMap, Value}; const FIELD: &str = "field"; const TABLE_SUFFIX: &str = "table_suffix"; @@ -111,7 +109,7 @@ impl TryFrom<&Yaml> for Dispatcher { impl Dispatcher { /// execute dispatcher and returns matched rule if any - pub(crate) fn exec(&self, data: &BTreeMap) -> Option<&Rule> { + pub(crate) fn exec(&self, data: &PipelineMap) -> Option<&Rule> { if let Some(value) = data.get(&self.field) { for rule in &self.rules { if rule.value == *value { diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index deee21d8bb..56ec4539a0 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -20,14 +20,13 @@ pub mod processor; pub mod transform; pub mod value; -use std::collections::BTreeMap; use std::sync::Arc; use error::{ IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu, }; use itertools::Itertools; -use processor::{IntermediateStatus, Processor, Processors}; +use processor::{Processor, Processors}; use snafu::{ensure, OptionExt, ResultExt}; use transform::{Transformer, Transforms}; use value::Value; @@ -43,6 +42,8 @@ const TRANSFORM: &str = "transform"; const TRANSFORMS: &str = "transforms"; const DISPATCHER: &str = "dispatcher"; +pub type PipelineMap = std::collections::BTreeMap; + pub enum Content<'a> { Json(&'a str), Yaml(&'a str), @@ -153,10 +154,10 @@ impl PipelineExecOutput { } } -pub fn json_to_intermediate_state(val: serde_json::Value) -> Result { +pub fn json_to_intermediate_state(val: serde_json::Value) -> Result { match val { serde_json::Value::Object(map) => { - let mut intermediate_state = BTreeMap::new(); + let mut intermediate_state = PipelineMap::new(); for (k, v) in map { intermediate_state.insert(k, Value::try_from(v)?); } @@ -166,9 +167,7 @@ pub fn json_to_intermediate_state(val: serde_json::Value) -> Result, -) -> Result> { +pub fn json_array_to_intermediate_state(val: Vec) -> Result> { val.into_iter().map(json_to_intermediate_state).collect() } @@ -176,10 +175,7 @@ impl Pipeline where T: Transformer, { - pub fn exec_mut( - &self, - val: &mut BTreeMap, - ) -> Result> { + pub fn exec_mut(&self, val: &mut PipelineMap) -> Result> { for processor in self.processors.iter() { processor.exec_mut(val)?; } @@ -350,7 +346,7 @@ transform: type: timestamp, ns index: time"#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap(); - let mut payload = BTreeMap::new(); + let mut payload = PipelineMap::new(); payload.insert("message".to_string(), Value::String(message)); let result = pipeline .exec_mut(&mut payload) diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index 005feca379..e09e5bdc05 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -27,8 +27,6 @@ pub mod regex; pub mod timestamp; pub mod urlencoding; -use std::collections::BTreeMap; - use cmcd::CmcdProcessor; use csv::CsvProcessor; use date::DateProcessor; @@ -51,8 +49,8 @@ use super::error::{ ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, }; use super::field::{Field, Fields}; +use super::PipelineMap; use crate::etl::error::{Error, Result}; -use crate::etl::value::Value; use crate::etl_error::UnsupportedProcessorSnafu; const FIELD_NAME: &str = "field"; @@ -66,8 +64,6 @@ const TARGET_FIELDS_NAME: &str = "target_fields"; const JSON_PATH_NAME: &str = "json_path"; const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index"; -pub type IntermediateStatus = BTreeMap; - /// Processor trait defines the interface for all processors. /// /// A processor is a transformation that can be applied to a field in a document @@ -83,7 +79,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static { fn ignore_missing(&self) -> bool; /// Execute the processor on a vector which be preprocessed by the pipeline - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()>; + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()>; } #[derive(Debug)] diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index a5da69d0be..18c6e71998 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -16,12 +16,9 @@ //! //! Refer to [`CmcdProcessor`] for more information. -use std::collections::BTreeMap; - use snafu::{OptionExt, ResultExt}; use urlencoding::decode; -use super::IntermediateStatus; use crate::etl::error::{ CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu, FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, @@ -33,6 +30,7 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, }; use crate::etl::value::Value; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_CMCD: &str = "cmcd"; @@ -161,8 +159,8 @@ impl CmcdProcessor { format!("{}_{}", prefix, key) } - fn parse(&self, name: &str, value: &str) -> Result> { - let mut working_set = BTreeMap::new(); + fn parse(&self, name: &str, value: &str) -> Result { + let mut working_set = PipelineMap::new(); let parts = value.split(','); @@ -251,7 +249,7 @@ impl Processor for CmcdProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let name = field.input_field(); @@ -285,11 +283,9 @@ impl Processor for CmcdProcessor { #[cfg(test)] mod tests { - use std::collections::BTreeMap; - use urlencoding::decode; - use super::CmcdProcessor; + use super::*; use crate::etl::field::{Field, Fields}; use crate::etl::value::Value; @@ -436,7 +432,7 @@ mod tests { let expected = vec .into_iter() .map(|(k, v)| (k.to_string(), v)) - .collect::>(); + .collect::(); let actual = processor.parse("prefix", &decoded).unwrap(); assert_eq!(actual, expected); diff --git a/src/pipeline/src/etl/processor/csv.rs b/src/pipeline/src/etl/processor/csv.rs index a0fac70de1..2fe130c600 100644 --- a/src/pipeline/src/etl/processor/csv.rs +++ b/src/pipeline/src/etl/processor/csv.rs @@ -14,8 +14,6 @@ // Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html -use std::collections::BTreeMap; - use csv::{ReaderBuilder, Trim}; use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::Itertools; @@ -31,6 +29,7 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, }; use crate::etl::value::Value; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_CSV: &str = "csv"; @@ -60,7 +59,7 @@ pub struct CsvProcessor { impl CsvProcessor { // process the csv format string to a map with target_fields as keys - fn process(&self, val: &str) -> Result> { + fn process(&self, val: &str) -> Result { let mut reader = self.reader.from_reader(val.as_bytes()); if let Some(result) = reader.records().next() { @@ -190,7 +189,7 @@ impl Processor for CsvProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut BTreeMap) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let name = field.input_field(); @@ -240,7 +239,7 @@ mod tests { let result = processor.process("1,2").unwrap(); - let values = [ + let values: PipelineMap = [ ("a".into(), Value::String("1".into())), ("b".into(), Value::String("2".into())), ] @@ -266,7 +265,7 @@ mod tests { let result = processor.process("1,2").unwrap(); - let values = [ + let values: PipelineMap = [ ("a".into(), Value::String("1".into())), ("b".into(), Value::String("2".into())), ("c".into(), Value::Null), @@ -291,7 +290,7 @@ mod tests { let result = processor.process("1,2").unwrap(); - let values = [ + let values: PipelineMap = [ ("a".into(), Value::String("1".into())), ("b".into(), Value::String("2".into())), ("c".into(), Value::String("default".into())), @@ -317,7 +316,7 @@ mod tests { let result = processor.process("1,2").unwrap(); - let values = [ + let values: PipelineMap = [ ("a".into(), Value::String("1".into())), ("b".into(), Value::String("2".into())), ] diff --git a/src/pipeline/src/etl/processor/date.rs b/src/pipeline/src/etl/processor/date.rs index e080b79540..0af0424423 100644 --- a/src/pipeline/src/etl/processor/date.rs +++ b/src/pipeline/src/etl/processor/date.rs @@ -19,7 +19,6 @@ use chrono_tz::Tz; use lazy_static::lazy_static; use snafu::{OptionExt, ResultExt}; -use super::IntermediateStatus; use crate::etl::error::{ DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu, DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, @@ -31,6 +30,7 @@ use crate::etl::processor::{ FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::{Timestamp, Value}; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_DATE: &str = "date"; @@ -194,7 +194,7 @@ impl Processor for DateProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/decolorize.rs b/src/pipeline/src/etl/processor/decolorize.rs index 2547b99d68..fa70f4a288 100644 --- a/src/pipeline/src/etl/processor/decolorize.rs +++ b/src/pipeline/src/etl/processor/decolorize.rs @@ -22,7 +22,6 @@ use once_cell::sync::Lazy; use regex::Regex; use snafu::OptionExt; -use super::IntermediateStatus; use crate::etl::error::{ Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; @@ -31,6 +30,7 @@ use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::Value; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize"; @@ -102,7 +102,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/digest.rs b/src/pipeline/src/etl/processor/digest.rs index 64bb2a2f6d..b93af08d3c 100644 --- a/src/pipeline/src/etl/processor/digest.rs +++ b/src/pipeline/src/etl/processor/digest.rs @@ -24,7 +24,6 @@ use std::borrow::Cow; use regex::Regex; use snafu::OptionExt; -use super::IntermediateStatus; use crate::etl::error::{ Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; @@ -33,6 +32,7 @@ use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::Value; +use crate::etl::PipelineMap; use crate::etl_error::DigestPatternInvalidSnafu; pub(crate) const PROCESSOR_DIGEST: &str = "digest"; @@ -201,7 +201,7 @@ impl crate::etl::processor::Processor for DigestProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index 9ac28f7bf0..2a41d75923 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -18,7 +18,6 @@ use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use itertools::Itertools; use snafu::OptionExt; -use super::IntermediateStatus; use crate::etl::error::{ DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu, DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu, @@ -32,6 +31,7 @@ use crate::etl::processor::{ Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME, }; use crate::etl::value::Value; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_DISSECT: &str = "dissect"; @@ -601,7 +601,7 @@ impl Processor for DissectProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/epoch.rs b/src/pipeline/src/etl/processor/epoch.rs index 29ad6bd3d9..da638def9b 100644 --- a/src/pipeline/src/etl/processor/epoch.rs +++ b/src/pipeline/src/etl/processor/epoch.rs @@ -14,7 +14,6 @@ use snafu::{OptionExt, ResultExt}; -use super::IntermediateStatus; use crate::etl::error::{ EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result, @@ -30,6 +29,7 @@ use crate::etl::value::time::{ SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION, }; use crate::etl::value::{Timestamp, Value}; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_EPOCH: &str = "epoch"; const RESOLUTION_NAME: &str = "resolution"; @@ -163,7 +163,7 @@ impl Processor for EpochProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/gsub.rs b/src/pipeline/src/etl/processor/gsub.rs index 7f0f601f44..8950b418df 100644 --- a/src/pipeline/src/etl/processor/gsub.rs +++ b/src/pipeline/src/etl/processor/gsub.rs @@ -15,7 +15,6 @@ use regex::Regex; use snafu::{OptionExt, ResultExt}; -use super::IntermediateStatus; use crate::etl::error::{ Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result, @@ -26,6 +25,7 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, PATTERN_NAME, }; use crate::etl::value::Value; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_GSUB: &str = "gsub"; @@ -118,7 +118,7 @@ impl crate::etl::processor::Processor for GsubProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/join.rs b/src/pipeline/src/etl/processor/join.rs index 72fafdbf7d..64cf6d425b 100644 --- a/src/pipeline/src/etl/processor/join.rs +++ b/src/pipeline/src/etl/processor/join.rs @@ -14,7 +14,6 @@ use snafu::OptionExt; -use super::IntermediateStatus; use crate::etl::error::{ Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, @@ -25,6 +24,7 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, SEPARATOR_NAME, }; use crate::etl::value::{Array, Value}; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_JOIN: &str = "join"; @@ -95,7 +95,7 @@ impl Processor for JoinProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/json_path.rs b/src/pipeline/src/etl/processor/json_path.rs index 92916263e4..6b0e97f448 100644 --- a/src/pipeline/src/etl/processor/json_path.rs +++ b/src/pipeline/src/etl/processor/json_path.rs @@ -16,8 +16,8 @@ use jsonpath_rust::JsonPath; use snafu::{OptionExt, ResultExt}; use super::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, IntermediateStatus, Processor, - FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME, + FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME, }; use crate::etl::error::{Error, Result}; use crate::etl::field::Fields; @@ -126,7 +126,7 @@ impl Processor for JsonPathProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/letter.rs b/src/pipeline/src/etl/processor/letter.rs index 960521853e..1d4d248b87 100644 --- a/src/pipeline/src/etl/processor/letter.rs +++ b/src/pipeline/src/etl/processor/letter.rs @@ -14,7 +14,6 @@ use snafu::OptionExt; -use super::IntermediateStatus; use crate::etl::error::{ Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, @@ -25,6 +24,7 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, METHOD_NAME, }; use crate::etl::value::Value; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_LETTER: &str = "letter"; @@ -126,7 +126,7 @@ impl Processor for LetterProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index 27f30f65d9..a08b944725 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -18,13 +18,10 @@ const PATTERNS_NAME: &str = "patterns"; pub(crate) const PROCESSOR_REGEX: &str = "regex"; -use std::collections::BTreeMap; - use lazy_static::lazy_static; use regex::Regex; use snafu::{OptionExt, ResultExt}; -use super::IntermediateStatus; use crate::etl::error::{ Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu, @@ -36,6 +33,7 @@ use crate::etl::processor::{ FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME, }; use crate::etl::value::Value; +use crate::etl::PipelineMap; lazy_static! { static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap(); @@ -169,8 +167,8 @@ impl RegexProcessor { Ok(()) } - fn process(&self, prefix: &str, val: &str) -> Result> { - let mut result = BTreeMap::new(); + fn process(&self, prefix: &str, val: &str) -> Result { + let mut result = PipelineMap::new(); for gr in self.patterns.iter() { if let Some(captures) = gr.regex.captures(val) { for group in gr.groups.iter() { @@ -194,7 +192,7 @@ impl Processor for RegexProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); let prefix = field.target_or_input_field(); @@ -227,11 +225,10 @@ impl Processor for RegexProcessor { } #[cfg(test)] mod tests { - use std::collections::BTreeMap; - use ahash::{HashMap, HashMapExt}; use itertools::Itertools; + use super::*; use crate::etl::processor::regex::RegexProcessor; use crate::etl::value::{Map, Value}; @@ -272,7 +269,7 @@ ignore_missing: false"#; let cw = "[c=w,n=US_CA_SANJOSE,o=55155]"; let breadcrumbs_str = [cc, cg, co, cp, cw].iter().join(","); - let temporary_map: BTreeMap = [ + let temporary_map: PipelineMap = [ ("breadcrumbs_parent", Value::String(cc.to_string())), ("breadcrumbs_edge", Value::String(cg.to_string())), ("breadcrumbs_origin", Value::String(co.to_string())), diff --git a/src/pipeline/src/etl/processor/timestamp.rs b/src/pipeline/src/etl/processor/timestamp.rs index bf90e78f21..a4d215ed49 100644 --- a/src/pipeline/src/etl/processor/timestamp.rs +++ b/src/pipeline/src/etl/processor/timestamp.rs @@ -19,7 +19,6 @@ use chrono_tz::Tz; use lazy_static::lazy_static; use snafu::{OptionExt, ResultExt}; -use super::IntermediateStatus; use crate::etl::error::{ DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu, DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error, @@ -37,6 +36,7 @@ use crate::etl::value::time::{ SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION, }; use crate::etl::value::{Timestamp, Value}; +use crate::etl::PipelineMap; pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp"; const RESOLUTION_NAME: &str = "resolution"; @@ -298,7 +298,7 @@ impl Processor for TimestampProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/processor/urlencoding.rs b/src/pipeline/src/etl/processor/urlencoding.rs index c14c7d87b1..33d3f521a1 100644 --- a/src/pipeline/src/etl/processor/urlencoding.rs +++ b/src/pipeline/src/etl/processor/urlencoding.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - use snafu::{OptionExt, ResultExt}; use urlencoding::{decode, encode}; @@ -27,6 +25,7 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, METHOD_NAME, }; use crate::etl::value::Value; +use crate::PipelineMap; pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding"; @@ -127,7 +126,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut BTreeMap) -> Result<()> { + fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { diff --git a/src/pipeline/src/etl/transform.rs b/src/pipeline/src/etl/transform.rs index a61444d945..14cfa440fb 100644 --- a/src/pipeline/src/etl/transform.rs +++ b/src/pipeline/src/etl/transform.rs @@ -15,8 +15,6 @@ pub mod index; pub mod transformer; -use std::collections::BTreeMap; - use snafu::OptionExt; use crate::etl::error::{Error, Result}; @@ -39,6 +37,7 @@ use super::error::{ use super::field::Fields; use super::processor::{yaml_new_field, yaml_new_fields, yaml_string}; use super::value::Timestamp; +use super::PipelineMap; pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static { type Output; @@ -48,7 +47,7 @@ pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static { fn schemas(&self) -> &Vec; fn transforms(&self) -> &Transforms; fn transforms_mut(&mut self) -> &mut Transforms; - fn transform_mut(&self, val: &mut BTreeMap) -> Result; + fn transform_mut(&self, val: &mut PipelineMap) -> Result; } /// On Failure behavior when transform fails diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 621acc7581..0211e67db1 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -14,7 +14,7 @@ pub mod coerce; -use std::collections::{BTreeMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; use ahash::{HashMap, HashMapExt}; @@ -34,10 +34,10 @@ use crate::etl::error::{ UnsupportedNumberTypeSnafu, }; use crate::etl::field::{Field, Fields}; -use crate::etl::processor::IntermediateStatus; use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transformer, Transforms}; use crate::etl::value::{Timestamp, Value}; +use crate::etl::PipelineMap; const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp"; const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10; @@ -178,7 +178,7 @@ impl Transformer for GreptimeTransformer { } } - fn transform_mut(&self, val: &mut IntermediateStatus) -> Result { + fn transform_mut(&self, val: &mut PipelineMap) -> Result { let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()]; let mut output_index = 0; for transform in self.transforms.iter() { @@ -327,7 +327,7 @@ fn resolve_number_schema( ) } -fn values_to_row(schema_info: &mut SchemaInfo, values: BTreeMap) -> Result { +fn values_to_row(schema_info: &mut SchemaInfo, values: PipelineMap) -> Result { let mut row: Vec = Vec::with_capacity(schema_info.schema.len()); for _ in 0..schema_info.schema.len() { row.push(GreptimeValue { value_data: None }); @@ -513,7 +513,7 @@ fn values_to_row(schema_info: &mut SchemaInfo, values: BTreeMap) } fn identity_pipeline_inner<'a>( - array: Vec>, + array: Vec, tag_column_names: Option>, _params: &GreptimePipelineParams, ) -> Result { @@ -569,7 +569,7 @@ fn identity_pipeline_inner<'a>( /// 4. The pipeline will return an error if the same column datatype is mismatched /// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema. pub fn identity_pipeline( - array: Vec>, + array: Vec, table: Option>, params: &GreptimePipelineParams, ) -> Result { @@ -577,7 +577,7 @@ pub fn identity_pipeline( array .into_iter() .map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)) - .collect::>>>()? + .collect::>>()? } else { array }; @@ -596,11 +596,8 @@ pub fn identity_pipeline( /// /// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object. /// The error will be returned if the nested levels is greater than the `max_nested_levels`. -pub fn flatten_object( - object: BTreeMap, - max_nested_levels: usize, -) -> Result> { - let mut flattened = BTreeMap::new(); +pub fn flatten_object(object: PipelineMap, max_nested_levels: usize) -> Result { + let mut flattened = PipelineMap::new(); if !object.is_empty() { // it will use recursion to flatten the object. @@ -611,9 +608,9 @@ pub fn flatten_object( } fn do_flatten_object( - dest: &mut BTreeMap, + dest: &mut PipelineMap, base: Option<&str>, - object: BTreeMap, + object: PipelineMap, current_level: usize, max_nested_levels: usize, ) -> Result<()> { diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index b007e66513..124d598d9b 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -16,8 +16,6 @@ pub mod array; pub mod map; pub mod time; -use std::collections::BTreeMap; - pub use array::Array; use jsonb::{Number as JsonbNumber, Object as JsonbObject, Value as JsonbValue}; use jsonpath_rust::path::{JsonLike, Path}; @@ -32,6 +30,7 @@ use super::error::{ ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedNumberTypeSnafu, ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu, }; +use super::PipelineMap; use crate::etl::error::{Error, Result}; /// Value can be used as type @@ -347,7 +346,7 @@ impl TryFrom for Value { Ok(Value::Array(Array { values })) } serde_json::Value::Object(v) => { - let mut values = BTreeMap::new(); + let mut values = PipelineMap::new(); for (k, v) in v { values.insert(k, Value::try_from(v)?); } @@ -378,7 +377,7 @@ impl TryFrom<&yaml_rust::Yaml> for Value { Ok(Value::Array(Array { values })) } yaml_rust::Yaml::Hash(v) => { - let mut values = BTreeMap::new(); + let mut values = PipelineMap::new(); for (k, v) in v { let key = k .as_str() @@ -458,7 +457,7 @@ impl From for JsonbValue<'_> { } Value::Map(obj) => { let mut map = JsonbObject::new(); - for (k, v) in obj.into_iter() { + for (k, v) in obj.values.into_iter() { let val: JsonbValue = v.into(); map.insert(k, val); } diff --git a/src/pipeline/src/etl/value/map.rs b/src/pipeline/src/etl/value/map.rs index 9e730ef532..b406a69343 100644 --- a/src/pipeline/src/etl/value/map.rs +++ b/src/pipeline/src/etl/value/map.rs @@ -12,15 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - -use ahash::HashMap; - use crate::etl::value::Value; +use crate::PipelineMap; #[derive(Debug, Clone, PartialEq, Default)] pub struct Map { - pub values: BTreeMap, + pub values: PipelineMap, } impl Map { @@ -39,24 +36,14 @@ impl Map { } } -impl From> for Map { - fn from(values: HashMap) -> Self { - let mut map = Map::default(); - for (k, v) in values.into_iter() { - map.insert(k, v); - } - map - } -} - -impl From> for Map { - fn from(values: BTreeMap) -> Self { +impl From for Map { + fn from(values: PipelineMap) -> Self { Self { values } } } impl std::ops::Deref for Map { - type Target = BTreeMap; + type Target = PipelineMap; fn deref(&self) -> &Self::Target { &self.values @@ -69,16 +56,6 @@ impl std::ops::DerefMut for Map { } } -impl std::iter::IntoIterator for Map { - type Item = (String, Value); - - type IntoIter = std::collections::btree_map::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.values.into_iter() - } -} - impl std::fmt::Display for Map { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { let values = self diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index a6c82f9353..2b358c4572 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -25,8 +25,8 @@ pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; pub use etl::{ error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse, - Content, DispatchedTo, Pipeline, PipelineDefinition, PipelineExecOutput, PipelineWay, - SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, + Content, DispatchedTo, Pipeline, PipelineDefinition, PipelineExecOutput, PipelineMap, + PipelineWay, SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; pub use manager::{ error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef, diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 27c4d2757a..e952e4ba8a 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use api::v1::{RowInsertRequest, Rows}; use pipeline::{ DispatchedTo, GreptimePipelineParams, GreptimeTransformer, Pipeline, PipelineDefinition, - PipelineExecOutput, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, + PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; use session::context::QueryContextRef; use snafu::ResultExt; @@ -52,7 +52,7 @@ pub(crate) async fn run_pipeline( state: &PipelineHandlerRef, pipeline_definition: PipelineDefinition, pipeline_parameters: &GreptimePipelineParams, - array: Vec>, + array: Vec, table_name: String, query_ctx: &QueryContextRef, is_top_level: bool, @@ -81,8 +81,7 @@ pub(crate) async fn run_pipeline( let transform_timer = std::time::Instant::now(); let mut transformed = Vec::with_capacity(array.len()); - let mut dispatched: BTreeMap>> = - BTreeMap::new(); + let mut dispatched: BTreeMap> = BTreeMap::new(); for mut values in array { let r = pipeline