From 041b683a8df7ce6c82e2905666188e70da1fe217 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Wed, 11 Jun 2025 10:02:32 -0700 Subject: [PATCH] refactor: remove `PipelineMap` and use `Value` instead (#6278) * refactor: remove pipeline_map and use value instead * chore: remove unused comments * chore: move error to illegal state --- src/pipeline/src/dispatcher.rs | 6 +- src/pipeline/src/error.rs | 8 +- src/pipeline/src/etl.rs | 35 +++++---- src/pipeline/src/etl/ctx_req.rs | 11 ++- src/pipeline/src/etl/processor.rs | 4 +- src/pipeline/src/etl/processor/cmcd.rs | 13 ++-- src/pipeline/src/etl/processor/csv.rs | 17 ++-- src/pipeline/src/etl/processor/date.rs | 5 +- src/pipeline/src/etl/processor/decolorize.rs | 5 +- src/pipeline/src/etl/processor/digest.rs | 5 +- src/pipeline/src/etl/processor/dissect.rs | 5 +- src/pipeline/src/etl/processor/epoch.rs | 5 +- src/pipeline/src/etl/processor/gsub.rs | 5 +- src/pipeline/src/etl/processor/join.rs | 5 +- src/pipeline/src/etl/processor/json_parse.rs | 8 +- src/pipeline/src/etl/processor/json_path.rs | 8 +- src/pipeline/src/etl/processor/letter.rs | 5 +- src/pipeline/src/etl/processor/regex.rs | 13 ++-- src/pipeline/src/etl/processor/select.rs | 45 ++++++----- .../src/etl/processor/simple_extract.rs | 6 +- src/pipeline/src/etl/processor/timestamp.rs | 5 +- src/pipeline/src/etl/processor/urlencoding.rs | 5 +- src/pipeline/src/etl/processor/vrl.rs | 21 +++-- .../src/etl/transform/transformer/greptime.rs | 37 +++++---- src/pipeline/src/etl/value.rs | 78 +++++++++++++++---- src/pipeline/src/etl/value/map.rs | 11 +-- src/pipeline/src/lib.rs | 3 +- src/pipeline/src/tablesuffix.rs | 4 +- src/servers/src/http/event.rs | 74 ++++++++++-------- src/servers/src/interceptor.rs | 10 +-- src/servers/src/pipeline.rs | 4 +- src/servers/src/proto.rs | 14 ++-- 32 files changed, 273 insertions(+), 207 deletions(-) diff --git a/src/pipeline/src/dispatcher.rs b/src/pipeline/src/dispatcher.rs index f34140b1bc..2213b85d58 100644 --- a/src/pipeline/src/dispatcher.rs +++ b/src/pipeline/src/dispatcher.rs @@ -21,7 +21,7 @@ use crate::error::{ ValueRequiredForDispatcherRuleSnafu, }; use crate::etl::ctx_req::TABLE_SUFFIX_KEY; -use crate::{PipelineMap, Value}; +use crate::Value; const FIELD: &str = "field"; const PIPELINE: &str = "pipeline"; @@ -109,7 +109,7 @@ impl TryFrom<&Yaml> for Dispatcher { impl Dispatcher { /// execute dispatcher and returns matched rule if any - pub(crate) fn exec(&self, data: &PipelineMap) -> Option<&Rule> { + pub(crate) fn exec(&self, data: &Value) -> Option<&Rule> { if let Some(value) = data.get(&self.field) { for rule in &self.rules { if rule.value == *value { @@ -119,7 +119,7 @@ impl Dispatcher { None } else { - debug!("field {} not found in keys {:?}", &self.field, data.keys()); + debug!("field {} not found in keys {:?}", &self.field, data); None } } diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index f402903498..f2f3544bd8 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -734,6 +734,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Top level value must be map"))] + ValueMustBeMap { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to build DataFusion logical plan"))] BuildDfLogicalPlan { #[snafu(source)] @@ -809,7 +815,7 @@ impl ErrorExt for Error { PipelineNotFound { .. } | InvalidPipelineVersion { .. } | InvalidCustomTimeIndex { .. } => StatusCode::InvalidArguments, - MultiPipelineWithDiffSchema { .. } => StatusCode::IllegalState, + MultiPipelineWithDiffSchema { .. } | ValueMustBeMap { .. } => StatusCode::IllegalState, BuildDfLogicalPlan { .. } | RecordBatchLenNotMatch { .. } => StatusCode::Internal, ExecuteInternalStatement { source, .. } => source.status_code(), DataFrame { source, .. } => source.status_code(), diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 49784a5fe5..d2b48bc818 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -19,6 +19,8 @@ pub mod processor; pub mod transform; pub mod value; +use std::collections::BTreeMap; + use ahash::{HashMap, HashMapExt}; use api::v1::Row; use common_time::timestamp::TimeUnit; @@ -31,7 +33,7 @@ use yaml_rust::YamlLoader; use crate::dispatcher::{Dispatcher, Rule}; use crate::error::{ AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result, - YamlLoadSnafu, YamlParseSnafu, + ValueMustBeMapSnafu, YamlLoadSnafu, YamlParseSnafu, }; use crate::etl::ctx_req::TABLE_SUFFIX_KEY; use crate::etl::processor::ProcessorKind; @@ -45,8 +47,6 @@ const TRANSFORMS: &str = "transforms"; const DISPATCHER: &str = "dispatcher"; const TABLESUFFIX: &str = "table_suffix"; -pub type PipelineMap = std::collections::BTreeMap; - pub enum Content<'a> { Json(&'a str), Yaml(&'a str), @@ -155,7 +155,7 @@ impl DispatchedTo { pub enum PipelineExecOutput { Transformed(TransformedOutput), AutoTransform(AutoTransformOutput), - DispatchedTo(DispatchedTo, PipelineMap), + DispatchedTo(DispatchedTo, Value), } #[derive(Debug)] @@ -163,7 +163,7 @@ pub struct TransformedOutput { pub opt: ContextOpt, pub row: Row, pub table_suffix: Option, - pub pipeline_map: PipelineMap, + pub pipeline_map: Value, } #[derive(Debug)] @@ -171,7 +171,7 @@ pub struct AutoTransformOutput { pub table_suffix: Option, // ts_column_name -> unit pub ts_unit_map: HashMap, - pub pipeline_map: PipelineMap, + pub pipeline_map: Value, } impl PipelineExecOutput { @@ -197,42 +197,42 @@ impl PipelineExecOutput { } } -pub fn json_to_map(val: serde_json::Value) -> Result { +pub fn json_to_map(val: serde_json::Value) -> Result { match val { serde_json::Value::Object(map) => { - let mut intermediate_state = PipelineMap::new(); + let mut intermediate_state = BTreeMap::new(); for (k, v) in map { intermediate_state.insert(k, Value::try_from(v)?); } - Ok(intermediate_state) + Ok(Value::Map(intermediate_state.into())) } _ => InputValueMustBeObjectSnafu.fail(), } } -pub fn json_array_to_map(val: Vec) -> Result> { +pub fn json_array_to_map(val: Vec) -> Result> { val.into_iter().map(json_to_map).collect() } -pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result { +pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result { match val { simd_json::OwnedValue::Object(map) => { - let mut intermediate_state = PipelineMap::new(); + let mut intermediate_state = BTreeMap::new(); for (k, v) in map.into_iter() { intermediate_state.insert(k, Value::try_from(v)?); } - Ok(intermediate_state) + Ok(Value::Map(intermediate_state.into())) } _ => InputValueMustBeObjectSnafu.fail(), } } -pub fn simd_json_array_to_map(val: Vec) -> Result> { +pub fn simd_json_array_to_map(val: Vec) -> Result> { val.into_iter().map(simd_json_to_map).collect() } impl Pipeline { - pub fn exec_mut(&self, mut val: PipelineMap) -> Result { + pub fn exec_mut(&self, mut val: Value) -> Result { // process for processor in self.processors.iter() { val = processor.exec_mut(val)?; @@ -263,7 +263,7 @@ impl Pipeline { let mut ts_unit_map = HashMap::with_capacity(4); // get all ts values - for (k, v) in val.iter() { + for (k, v) in val.as_map_mut().context(ValueMustBeMapSnafu)? { if let Value::Timestamp(ts) = v { if !ts_unit_map.contains_key(k) { ts_unit_map.insert(k.clone(), ts.get_unit()); @@ -378,8 +378,9 @@ transform: type: timestamp, ns index: time"#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap(); - let mut payload = PipelineMap::new(); + let mut payload = BTreeMap::new(); payload.insert("message".to_string(), Value::String(message)); + let payload = Value::Map(payload.into()); let result = pipeline .exec_mut(payload) .unwrap() diff --git a/src/pipeline/src/etl/ctx_req.rs b/src/pipeline/src/etl/ctx_req.rs index c4b38c8964..d0e86ff629 100644 --- a/src/pipeline/src/etl/ctx_req.rs +++ b/src/pipeline/src/etl/ctx_req.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use ahash::{HashMap, HashMapExt}; use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; use session::context::{QueryContext, QueryContextRef}; +use snafu::OptionExt; +use crate::error::{Result, ValueMustBeMapSnafu}; use crate::tablesuffix::TableSuffixTemplate; -use crate::PipelineMap; +use crate::Value; const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table"; const GREPTIME_TTL: &str = "greptime_ttl"; @@ -71,7 +73,8 @@ pub struct ContextOpt { } impl ContextOpt { - pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> Self { + pub fn from_pipeline_map_to_opt(pipeline_map: &mut Value) -> Result { + let pipeline_map = pipeline_map.as_map_mut().context(ValueMustBeMapSnafu)?; let mut opt = Self::default(); for k in PIPELINE_HINT_KEYS { if let Some(v) = pipeline_map.remove(k) { @@ -101,13 +104,13 @@ impl ContextOpt { } } } - opt + Ok(opt) } pub(crate) fn resolve_table_suffix( &mut self, table_suffix: Option<&TableSuffixTemplate>, - pipeline_map: &PipelineMap, + pipeline_map: &Value, ) -> Option { self.table_suffix .take() diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index 02681d86fe..dfee5c03b0 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -60,7 +60,7 @@ use crate::etl::processor::json_parse::JsonParseProcessor; use crate::etl::processor::select::SelectProcessor; use crate::etl::processor::simple_extract::SimpleExtractProcessor; use crate::etl::processor::vrl::VrlProcessor; -use crate::etl::PipelineMap; +use crate::Value; const FIELD_NAME: &str = "field"; const FIELDS_NAME: &str = "fields"; @@ -125,7 +125,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static { fn ignore_missing(&self) -> bool; /// Execute the processor on a vector which be preprocessed by the pipeline - fn exec_mut(&self, val: PipelineMap) -> Result; + fn exec_mut(&self, val: Value) -> Result; } #[derive(Debug)] diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index 9959bb6a96..3d736192c7 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -16,6 +16,8 @@ //! //! Refer to [`CmcdProcessor`] for more information. +use std::collections::BTreeMap; + use snafu::{OptionExt, ResultExt}; use urlencoding::decode; @@ -30,7 +32,6 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, }; use crate::etl::value::Value; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_CMCD: &str = "cmcd"; @@ -159,8 +160,8 @@ impl CmcdProcessor { format!("{}_{}", prefix, key) } - fn parse(&self, name: &str, value: &str) -> Result { - let mut working_set = PipelineMap::new(); + fn parse(&self, name: &str, value: &str) -> Result> { + let mut working_set = BTreeMap::new(); let parts = value.split(','); @@ -249,14 +250,14 @@ impl Processor for CmcdProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let name = field.input_field(); match val.get(name) { Some(Value::String(s)) => { let results = self.parse(field.target_or_input_field(), s)?; - val.extend(results); + val.extend(results.into())?; } Some(Value::Null) | None => { if !self.ignore_missing { @@ -432,7 +433,7 @@ mod tests { let expected = vec .into_iter() .map(|(k, v)| (k.to_string(), v)) - .collect::(); + .collect::>(); let actual = processor.parse("prefix", &decoded).unwrap(); assert_eq!(actual, expected); diff --git a/src/pipeline/src/etl/processor/csv.rs b/src/pipeline/src/etl/processor/csv.rs index 720faf1c87..b15ee42dc9 100644 --- a/src/pipeline/src/etl/processor/csv.rs +++ b/src/pipeline/src/etl/processor/csv.rs @@ -14,6 +14,8 @@ // Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html +use std::collections::BTreeMap; + use csv::{ReaderBuilder, Trim}; use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::Itertools; @@ -29,7 +31,6 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, }; use crate::etl::value::Value; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_CSV: &str = "csv"; @@ -59,7 +60,7 @@ pub struct CsvProcessor { impl CsvProcessor { // process the csv format string to a map with target_fields as keys - fn process(&self, val: &str) -> Result { + fn process(&self, val: &str) -> Result> { let mut reader = self.reader.from_reader(val.as_bytes()); if let Some(result) = reader.records().next() { @@ -189,14 +190,14 @@ impl Processor for CsvProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let name = field.input_field(); match val.get(name) { Some(Value::String(v)) => { let results = self.process(v)?; - val.extend(results); + val.extend(results.into())?; } Some(Value::Null) | None => { if !self.ignore_missing { @@ -239,7 +240,7 @@ mod tests { let result = processor.process("1,2").unwrap(); - let values: PipelineMap = [ + let values: BTreeMap = [ ("a".into(), Value::String("1".into())), ("b".into(), Value::String("2".into())), ] @@ -265,7 +266,7 @@ mod tests { let result = processor.process("1,2").unwrap(); - let values: PipelineMap = [ + let values: BTreeMap = [ ("a".into(), Value::String("1".into())), ("b".into(), Value::String("2".into())), ("c".into(), Value::Null), @@ -290,7 +291,7 @@ mod tests { let result = processor.process("1,2").unwrap(); - let values: PipelineMap = [ + let values: BTreeMap = [ ("a".into(), Value::String("1".into())), ("b".into(), Value::String("2".into())), ("c".into(), Value::String("default".into())), @@ -316,7 +317,7 @@ mod tests { let result = processor.process("1,2").unwrap(); - let values: PipelineMap = [ + let values: BTreeMap = [ ("a".into(), Value::String("1".into())), ("b".into(), Value::String("2".into())), ] diff --git a/src/pipeline/src/etl/processor/date.rs b/src/pipeline/src/etl/processor/date.rs index 691c4b3732..102c3fb7f2 100644 --- a/src/pipeline/src/etl/processor/date.rs +++ b/src/pipeline/src/etl/processor/date.rs @@ -30,7 +30,6 @@ use crate::etl::processor::{ FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::{Timestamp, Value}; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_DATE: &str = "date"; @@ -198,14 +197,14 @@ impl Processor for DateProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { Some(Value::String(s)) => { let timestamp = self.parse(s)?; let output_key = field.target_or_input_field(); - val.insert(output_key.to_string(), Value::Timestamp(timestamp)); + val.insert(output_key.to_string(), Value::Timestamp(timestamp))?; } Some(Value::Null) | None => { if !self.ignore_missing { diff --git a/src/pipeline/src/etl/processor/decolorize.rs b/src/pipeline/src/etl/processor/decolorize.rs index 0b334dc6a2..251332519e 100644 --- a/src/pipeline/src/etl/processor/decolorize.rs +++ b/src/pipeline/src/etl/processor/decolorize.rs @@ -30,7 +30,6 @@ use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::Value; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize"; @@ -102,7 +101,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -118,7 +117,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor { Some(v) => { let result = self.process(v)?; let output_index = field.target_or_input_field(); - val.insert(output_index.to_string(), result); + val.insert(output_index.to_string(), result)?; } } } diff --git a/src/pipeline/src/etl/processor/digest.rs b/src/pipeline/src/etl/processor/digest.rs index e5ea493689..9a2efef772 100644 --- a/src/pipeline/src/etl/processor/digest.rs +++ b/src/pipeline/src/etl/processor/digest.rs @@ -33,7 +33,6 @@ use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::Value; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_DIGEST: &str = "digest"; @@ -201,7 +200,7 @@ impl crate::etl::processor::Processor for DigestProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -217,7 +216,7 @@ impl crate::etl::processor::Processor for DigestProcessor { Some(v) => { let result = self.process(v)?; let output_index = field.target_or_input_field(); - val.insert(output_index.to_string(), result); + val.insert(output_index.to_string(), result)?; } } } diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index c72de8bd40..4a32a59b28 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -31,7 +31,6 @@ use crate::etl::processor::{ Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME, }; use crate::etl::value::Value; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_DISSECT: &str = "dissect"; @@ -601,14 +600,14 @@ impl Processor for DissectProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { Some(Value::String(val_str)) => { let r = self.process(val_str)?; for (k, v) in r { - val.insert(k, v); + val.insert(k, v)?; } } Some(Value::Null) | None => { diff --git a/src/pipeline/src/etl/processor/epoch.rs b/src/pipeline/src/etl/processor/epoch.rs index 24f0790890..52be05da82 100644 --- a/src/pipeline/src/etl/processor/epoch.rs +++ b/src/pipeline/src/etl/processor/epoch.rs @@ -29,7 +29,6 @@ use crate::etl::value::time::{ SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION, }; use crate::etl::value::{Timestamp, Value}; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_EPOCH: &str = "epoch"; const RESOLUTION_NAME: &str = "resolution"; @@ -167,7 +166,7 @@ impl Processor for EpochProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -183,7 +182,7 @@ impl Processor for EpochProcessor { Some(v) => { let timestamp = self.parse(v)?; let output_index = field.target_or_input_field(); - val.insert(output_index.to_string(), Value::Timestamp(timestamp)); + val.insert(output_index.to_string(), Value::Timestamp(timestamp))?; } } } diff --git a/src/pipeline/src/etl/processor/gsub.rs b/src/pipeline/src/etl/processor/gsub.rs index 1f46856d2e..06047b6dfb 100644 --- a/src/pipeline/src/etl/processor/gsub.rs +++ b/src/pipeline/src/etl/processor/gsub.rs @@ -25,7 +25,6 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, PATTERN_NAME, }; use crate::etl::value::Value; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_GSUB: &str = "gsub"; @@ -118,7 +117,7 @@ impl crate::etl::processor::Processor for GsubProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -134,7 +133,7 @@ impl crate::etl::processor::Processor for GsubProcessor { Some(v) => { let result = self.process(v)?; let output_index = field.target_or_input_field(); - val.insert(output_index.to_string(), result); + val.insert(output_index.to_string(), result)?; } } } diff --git a/src/pipeline/src/etl/processor/join.rs b/src/pipeline/src/etl/processor/join.rs index 74de9e99ae..816d38187e 100644 --- a/src/pipeline/src/etl/processor/join.rs +++ b/src/pipeline/src/etl/processor/join.rs @@ -24,7 +24,6 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, SEPARATOR_NAME, }; use crate::etl::value::{Array, Value}; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_JOIN: &str = "join"; @@ -95,14 +94,14 @@ impl Processor for JoinProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { Some(Value::Array(arr)) => { let result = self.process(arr)?; let output_index = field.target_or_input_field(); - val.insert(output_index.to_string(), result); + val.insert(output_index.to_string(), result)?; } Some(Value::Null) | None => { if !self.ignore_missing { diff --git a/src/pipeline/src/etl/processor/json_parse.rs b/src/pipeline/src/etl/processor/json_parse.rs index ef3d021b5c..84ea18ebdc 100644 --- a/src/pipeline/src/etl/processor/json_parse.rs +++ b/src/pipeline/src/etl/processor/json_parse.rs @@ -22,7 +22,7 @@ use crate::etl::field::Fields; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, }; -use crate::{json_to_map, PipelineMap, Processor, Value}; +use crate::{json_to_map, Processor, Value}; pub(crate) const PROCESSOR_JSON_PARSE: &str = "json_parse"; @@ -77,7 +77,7 @@ impl JsonParseProcessor { }; let parsed: serde_json::Value = serde_json::from_str(json_str).context(JsonParseSnafu)?; match parsed { - serde_json::Value::Object(_) => Ok(Value::Map(json_to_map(parsed)?.into())), + serde_json::Value::Object(_) => Ok(json_to_map(parsed)?), serde_json::Value::Array(arr) => Ok(Value::Array(arr.try_into()?)), _ => ProcessorUnsupportedValueSnafu { processor: self.kind(), @@ -97,14 +97,14 @@ impl Processor for JsonParseProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { Some(v) => { let processed = self.process_field(v)?; let output_index = field.target_or_input_field(); - val.insert(output_index.to_string(), processed); + val.insert(output_index.to_string(), processed)?; } None => { if !self.ignore_missing { diff --git a/src/pipeline/src/etl/processor/json_path.rs b/src/pipeline/src/etl/processor/json_path.rs index c33af79ffe..df515f966d 100644 --- a/src/pipeline/src/etl/processor/json_path.rs +++ b/src/pipeline/src/etl/processor/json_path.rs @@ -21,8 +21,8 @@ use crate::error::{ }; use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME, - FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME, + IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME, }; use crate::Value; @@ -125,14 +125,14 @@ impl Processor for JsonPathProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { Some(v) => { let processed = self.process_field(v)?; let output_index = field.target_or_input_field(); - val.insert(output_index.to_string(), processed); + val.insert(output_index.to_string(), processed)?; } None => { if !self.ignore_missing { diff --git a/src/pipeline/src/etl/processor/letter.rs b/src/pipeline/src/etl/processor/letter.rs index 1bc94b2c20..2e8c894bce 100644 --- a/src/pipeline/src/etl/processor/letter.rs +++ b/src/pipeline/src/etl/processor/letter.rs @@ -24,7 +24,6 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, METHOD_NAME, }; use crate::etl::value::Value; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_LETTER: &str = "letter"; @@ -126,14 +125,14 @@ impl Processor for LetterProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { Some(Value::String(s)) => { let result = self.process_field(s)?; let output_key = field.target_or_input_field(); - val.insert(output_key.to_string(), result); + val.insert(output_key.to_string(), result)?; } Some(Value::Null) | None => { if !self.ignore_missing { diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index 58fd01e5aa..b9ff830666 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -18,6 +18,8 @@ const PATTERNS_NAME: &str = "patterns"; pub(crate) const PROCESSOR_REGEX: &str = "regex"; +use std::collections::BTreeMap; + use lazy_static::lazy_static; use regex::Regex; use snafu::{OptionExt, ResultExt}; @@ -33,7 +35,6 @@ use crate::etl::processor::{ FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME, }; use crate::etl::value::Value; -use crate::etl::PipelineMap; lazy_static! { static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap(); @@ -167,8 +168,8 @@ impl RegexProcessor { Ok(()) } - fn process(&self, prefix: &str, val: &str) -> Result { - let mut result = PipelineMap::new(); + fn process(&self, prefix: &str, val: &str) -> Result> { + let mut result = BTreeMap::new(); for gr in self.patterns.iter() { if let Some(captures) = gr.regex.captures(val) { for group in gr.groups.iter() { @@ -192,14 +193,14 @@ impl Processor for RegexProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); let prefix = field.target_or_input_field(); match val.get(index) { Some(Value::String(s)) => { let result = self.process(prefix, s)?; - val.extend(result); + val.extend(result.into())?; } Some(Value::Null) | None => { if !self.ignore_missing { @@ -269,7 +270,7 @@ ignore_missing: false"#; let cw = "[c=w,n=US_CA_SANJOSE,o=55155]"; let breadcrumbs_str = [cc, cg, co, cp, cw].iter().join(","); - let temporary_map: PipelineMap = [ + let temporary_map: BTreeMap = [ ("breadcrumbs_parent", Value::String(cc.to_string())), ("breadcrumbs_edge", Value::String(cg.to_string())), ("breadcrumbs_origin", Value::String(co.to_string())), diff --git a/src/pipeline/src/etl/processor/select.rs b/src/pipeline/src/etl/processor/select.rs index 0cea542949..dbb3a11353 100644 --- a/src/pipeline/src/etl/processor/select.rs +++ b/src/pipeline/src/etl/processor/select.rs @@ -15,12 +15,14 @@ use ahash::{HashSet, HashSetExt}; use snafu::OptionExt; -use crate::error::{Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result}; +use crate::error::{ + Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result, ValueMustBeMapSnafu, +}; use crate::etl::field::Fields; use crate::etl::processor::{ yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME, }; -use crate::{PipelineMap, Processor}; +use crate::{Processor, Value}; pub(crate) const PROCESSOR_SELECT: &str = "select"; const INCLUDE_KEY: &str = "include"; @@ -96,27 +98,29 @@ impl Processor for SelectProcessor { true } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { + let v_map = val.as_map_mut().context(ValueMustBeMapSnafu)?; + match self.select_type { SelectType::Include => { - let mut include_key_set = HashSet::with_capacity(val.len()); + let mut include_key_set = HashSet::with_capacity(v_map.len()); for field in self.fields.iter() { // If the field has a target, move the value to the target let field_name = field.input_field(); if let Some(target_name) = field.target_field() { - if let Some(v) = val.remove(field_name) { - val.insert(target_name.to_string(), v); + if let Some(v) = v_map.remove(field_name) { + v_map.insert(target_name.to_string(), v); } include_key_set.insert(target_name); } else { include_key_set.insert(field_name); } } - val.retain(|k, _| include_key_set.contains(k.as_str())); + v_map.retain(|k, _| include_key_set.contains(k.as_str())); } SelectType::Exclude => { for field in self.fields.iter() { - val.remove(field.input_field()); + v_map.remove(field.input_field()); } } } @@ -127,9 +131,11 @@ impl Processor for SelectProcessor { #[cfg(test)] mod test { + use std::collections::BTreeMap; + use crate::etl::field::{Field, Fields}; use crate::etl::processor::select::{SelectProcessor, SelectType}; - use crate::{PipelineMap, Processor, Value}; + use crate::{Map, Processor, Value}; #[test] fn test_select() { @@ -138,13 +144,14 @@ mod test { select_type: SelectType::Include, }; - let mut p = PipelineMap::new(); + let mut p = BTreeMap::new(); p.insert("hello".to_string(), Value::String("world".to_string())); p.insert("hello2".to_string(), Value::String("world2".to_string())); - let result = processor.exec_mut(p); + let result = processor.exec_mut(Value::Map(Map { values: p })); assert!(result.is_ok()); - let p = result.unwrap(); + let mut result = result.unwrap(); + let p = result.as_map_mut().unwrap(); assert_eq!(p.len(), 1); assert_eq!(p.get("hello"), Some(&Value::String("world".to_string()))); } @@ -156,13 +163,14 @@ mod test { select_type: SelectType::Include, }; - let mut p = PipelineMap::new(); + let mut p = BTreeMap::new(); p.insert("hello".to_string(), Value::String("world".to_string())); p.insert("hello2".to_string(), Value::String("world2".to_string())); - let result = processor.exec_mut(p); + let result = processor.exec_mut(Value::Map(Map { values: p })); assert!(result.is_ok()); - let p = result.unwrap(); + let mut result = result.unwrap(); + let p = result.as_map_mut().unwrap(); assert_eq!(p.len(), 1); assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string()))); } @@ -174,13 +182,14 @@ mod test { select_type: SelectType::Exclude, }; - let mut p = PipelineMap::new(); + let mut p = BTreeMap::new(); p.insert("hello".to_string(), Value::String("world".to_string())); p.insert("hello2".to_string(), Value::String("world2".to_string())); - let result = processor.exec_mut(p); + let result = processor.exec_mut(Value::Map(Map { values: p })); assert!(result.is_ok()); - let p = result.unwrap(); + let mut result = result.unwrap(); + let p = result.as_map_mut().unwrap(); assert_eq!(p.len(), 1); assert_eq!(p.get("hello"), None); assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string()))); diff --git a/src/pipeline/src/etl/processor/simple_extract.rs b/src/pipeline/src/etl/processor/simple_extract.rs index fd94692742..0fcf2c4979 100644 --- a/src/pipeline/src/etl/processor/simple_extract.rs +++ b/src/pipeline/src/etl/processor/simple_extract.rs @@ -20,7 +20,7 @@ use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, KEY_NAME, }; -use crate::{PipelineMap, Processor, Value}; +use crate::{Processor, Value}; pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract"; @@ -98,14 +98,14 @@ impl Processor for SimpleExtractProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { Some(v) => { let processed = self.process_field(v)?; let output_index = field.target_or_input_field(); - val.insert(output_index.to_string(), processed); + val.insert(output_index.to_string(), processed)?; } None => { if !self.ignore_missing { diff --git a/src/pipeline/src/etl/processor/timestamp.rs b/src/pipeline/src/etl/processor/timestamp.rs index d36d659dbb..4ea79b0356 100644 --- a/src/pipeline/src/etl/processor/timestamp.rs +++ b/src/pipeline/src/etl/processor/timestamp.rs @@ -36,7 +36,6 @@ use crate::etl::value::time::{ SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION, }; use crate::etl::value::{Timestamp, Value}; -use crate::etl::PipelineMap; pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp"; const RESOLUTION_NAME: &str = "resolution"; @@ -302,7 +301,7 @@ impl Processor for TimestampProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { @@ -318,7 +317,7 @@ impl Processor for TimestampProcessor { Some(v) => { let result = self.parse(v)?; let output_key = field.target_or_input_field(); - val.insert(output_key.to_string(), Value::Timestamp(result)); + val.insert(output_key.to_string(), Value::Timestamp(result))?; } } } diff --git a/src/pipeline/src/etl/processor/urlencoding.rs b/src/pipeline/src/etl/processor/urlencoding.rs index 3eb515484a..e56076b1dd 100644 --- a/src/pipeline/src/etl/processor/urlencoding.rs +++ b/src/pipeline/src/etl/processor/urlencoding.rs @@ -25,7 +25,6 @@ use crate::etl::processor::{ IGNORE_MISSING_NAME, METHOD_NAME, }; use crate::etl::value::Value; -use crate::PipelineMap; pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding"; @@ -126,14 +125,14 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor { self.ignore_missing } - fn exec_mut(&self, mut val: PipelineMap) -> Result { + fn exec_mut(&self, mut val: Value) -> Result { for field in self.fields.iter() { let index = field.input_field(); match val.get(index) { Some(Value::String(s)) => { let result = self.process_field(s)?; let output_index = field.target_or_input_field(); - val.insert(output_index.to_string(), result); + val.insert(output_index.to_string(), result)?; } Some(Value::Null) | None => { if !self.ignore_missing { diff --git a/src/pipeline/src/etl/processor/vrl.rs b/src/pipeline/src/etl/processor/vrl.rs index eeac4d49a6..b2a90b5955 100644 --- a/src/pipeline/src/etl/processor/vrl.rs +++ b/src/pipeline/src/etl/processor/vrl.rs @@ -27,7 +27,7 @@ use crate::error::{ InvalidTimestampSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu, VrlReturnValueSnafu, }; use crate::etl::processor::yaml_string; -use crate::{PipelineMap, Value as PipelineValue}; +use crate::Value as PipelineValue; pub(crate) const PROCESSOR_VRL: &str = "vrl"; const SOURCE: &str = "source"; @@ -62,14 +62,11 @@ impl VrlProcessor { Ok(Self { source, program }) } - pub fn resolve(&self, m: PipelineMap) -> Result { - let pipeline_vrl = m - .into_iter() - .map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v))) - .collect::>>()?; + pub fn resolve(&self, m: PipelineValue) -> Result { + let pipeline_vrl = pipeline_value_to_vrl_value(m)?; let mut target = TargetValue { - value: VrlValue::Object(pipeline_vrl), + value: pipeline_vrl, metadata: VrlValue::Object(BTreeMap::new()), secrets: Secrets::default(), }; @@ -116,11 +113,11 @@ impl crate::etl::processor::Processor for VrlProcessor { true } - fn exec_mut(&self, val: PipelineMap) -> Result { + fn exec_mut(&self, val: PipelineValue) -> Result { let val = self.resolve(val)?; if let PipelineValue::Map(m) = val { - Ok(m.values) + Ok(PipelineValue::Map(m.values.into())) } else { VrlRegexValueSnafu.fail() } @@ -244,19 +241,19 @@ del(.user_info) assert!(v.is_ok()); let v = v.unwrap(); - let mut n = PipelineMap::new(); + let mut n = BTreeMap::new(); n.insert( "name".to_string(), PipelineValue::String("certain_name".to_string()), ); - let mut m = PipelineMap::new(); + let mut m = BTreeMap::new(); m.insert( "user_info".to_string(), PipelineValue::Map(Map { values: n }), ); - let re = v.resolve(m); + let re = v.resolve(PipelineValue::Map(Map { values: m })); assert!(re.is_ok()); let re = re.unwrap(); diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index f35f005b6b..6c2b4544d0 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -14,7 +14,7 @@ pub mod coerce; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::sync::Arc; use ahash::{HashMap, HashMapExt}; @@ -29,19 +29,19 @@ use itertools::Itertools; use once_cell::sync::OnceCell; use serde_json::Number; use session::context::Channel; +use snafu::OptionExt; use crate::error::{ IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result, TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu, - TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu, + TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu, ValueMustBeMapSnafu, }; use crate::etl::ctx_req::ContextOpt; use crate::etl::field::{Field, Fields}; use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transforms}; use crate::etl::value::{Timestamp, Value}; -use crate::etl::PipelineMap; -use crate::PipelineContext; +use crate::{Map, PipelineContext}; const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp"; const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10; @@ -186,8 +186,8 @@ impl GreptimeTransformer { } } - pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(ContextOpt, Row)> { - let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map); + pub fn transform_mut(&self, pipeline_map: &mut Value) -> Result<(ContextOpt, Row)> { + let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map)?; let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()]; let mut output_index = 0; @@ -337,7 +337,7 @@ fn resolve_number_schema( ) } -fn calc_ts(p_ctx: &PipelineContext, values: &PipelineMap) -> Result> { +fn calc_ts(p_ctx: &PipelineContext, values: &Value) -> Result> { match p_ctx.channel { Channel::Prometheus => Ok(Some(ValueData::TimestampMillisecondValue( values @@ -362,7 +362,7 @@ fn calc_ts(p_ctx: &PipelineContext, values: &PipelineMap) -> Result, ) -> Result { let mut row: Vec = Vec::with_capacity(schema_info.schema.len()); @@ -382,6 +382,8 @@ fn values_to_row( .as_ref() .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name()); + let values = values.into_map().context(ValueMustBeMapSnafu)?; + for (column_name, value) in values { if column_name == ts_column_name { continue; @@ -518,7 +520,7 @@ fn resolve_value( } fn identity_pipeline_inner( - pipeline_maps: Vec, + pipeline_maps: Vec, pipeline_ctx: &PipelineContext<'_>, ) -> Result<(SchemaInfo, HashMap>)> { let mut schema_info = SchemaInfo::default(); @@ -545,7 +547,7 @@ fn identity_pipeline_inner( let len = pipeline_maps.len(); for mut pipeline_map in pipeline_maps { - let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map); + let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map)?; let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?; opt_map @@ -576,7 +578,7 @@ fn identity_pipeline_inner( /// 4. The pipeline will return an error if the same column datatype is mismatched /// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema. pub fn identity_pipeline( - array: Vec, + array: Vec, table: Option>, pipeline_ctx: &PipelineContext<'_>, ) -> Result> { @@ -584,7 +586,7 @@ pub fn identity_pipeline( array .into_iter() .map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)) - .collect::>>()? + .collect::>>()? } else { array }; @@ -618,21 +620,22 @@ pub fn identity_pipeline( /// /// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object. /// The error will be returned if the nested levels is greater than the `max_nested_levels`. -pub fn flatten_object(object: PipelineMap, max_nested_levels: usize) -> Result { - let mut flattened = PipelineMap::new(); +pub fn flatten_object(object: Value, max_nested_levels: usize) -> Result { + let mut flattened = BTreeMap::new(); + let object = object.into_map().context(ValueMustBeMapSnafu)?; if !object.is_empty() { // it will use recursion to flatten the object. do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?; } - Ok(flattened) + Ok(Value::Map(Map { values: flattened })) } fn do_flatten_object( - dest: &mut PipelineMap, + dest: &mut BTreeMap, base: Option<&str>, - object: PipelineMap, + object: BTreeMap, current_level: usize, max_nested_levels: usize, ) -> Result<()> { diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index 51ac76c4a5..3eb7294720 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -16,6 +16,7 @@ pub mod array; pub mod map; pub mod time; +use std::collections::BTreeMap; use std::result::Result as StdResult; pub use array::Array; @@ -30,15 +31,16 @@ pub use time::Timestamp; use crate::error::{ Error, Result, UnsupportedNumberTypeSnafu, ValueDefaultValueUnsupportedSnafu, - ValueInvalidResolutionSnafu, ValueParseBooleanSnafu, ValueParseFloatSnafu, ValueParseIntSnafu, - ValueParseTypeSnafu, ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu, + ValueInvalidResolutionSnafu, ValueMustBeMapSnafu, ValueParseBooleanSnafu, ValueParseFloatSnafu, + ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedYamlTypeSnafu, + ValueYamlKeyMustBeStringSnafu, }; -use crate::etl::PipelineMap; + +pub type PipelineMap = Value; /// Value can be used as type /// acts as value: the enclosed value is the actual value /// acts as type: the enclosed value is the default value - #[derive(Debug, Clone, PartialEq, Default)] pub enum Value { // as value: null @@ -70,6 +72,47 @@ pub enum Value { } impl Value { + pub fn get(&self, key: &str) -> Option<&Self> { + match self { + Value::Map(map) => map.get(key), + _ => None, + } + } + + pub fn get_mut(&mut self, key: &str) -> Option<&mut Self> { + match self { + Value::Map(map) => map.get_mut(key), + _ => None, + } + } + + pub fn remove(&mut self, key: &str) -> Option { + match self { + Value::Map(map) => map.remove(key), + _ => None, + } + } + + pub fn extend(&mut self, other: Map) -> Result<()> { + match self { + Value::Map(map) => { + map.extend(other); + Ok(()) + } + _ => ValueMustBeMapSnafu.fail(), + } + } + + pub fn insert(&mut self, key: String, value: Value) -> Result<()> { + match self { + Value::Map(map) => { + map.insert(key, value); + Ok(()) + } + _ => ValueMustBeMapSnafu.fail(), + } + } + pub fn is_null(&self) -> bool { matches!(self, Value::Null) } @@ -236,13 +279,6 @@ impl Value { } } - pub fn get(&self, key: &str) -> Option<&Self> { - match self { - Value::Map(map) => map.get(key), - _ => None, - } - } - pub fn as_str(&self) -> Option<&str> { match self { Value::String(v) => Some(v), @@ -289,6 +325,20 @@ impl Value { } } + pub fn as_map_mut(&mut self) -> Option<&mut BTreeMap> { + match self { + Value::Map(map) => Some(map), + _ => None, + } + } + + pub fn into_map(self) -> Option> { + match self { + Value::Map(map) => Some(map.values), + _ => None, + } + } + // ref https://github.com/serde-rs/json/blob/master/src/value/mod.rs#L779 pub fn pointer(&self, pointer: &str) -> Option<&Value> { if pointer.is_empty() { @@ -388,7 +438,7 @@ impl TryFrom for Value { Ok(Value::Array(Array { values: re })) } simd_json::OwnedValue::Object(map) => { - let mut values = PipelineMap::new(); + let mut values = BTreeMap::new(); for (k, v) in map.into_iter() { values.insert(k, Value::try_from(v)?); } @@ -425,7 +475,7 @@ impl TryFrom for Value { Ok(Value::Array(Array { values })) } serde_json::Value::Object(v) => { - let mut values = PipelineMap::new(); + let mut values = BTreeMap::new(); for (k, v) in v { values.insert(k, Value::try_from(v)?); } @@ -456,7 +506,7 @@ impl TryFrom<&yaml_rust::Yaml> for Value { Ok(Value::Array(Array { values })) } yaml_rust::Yaml::Hash(v) => { - let mut values = PipelineMap::new(); + let mut values = BTreeMap::new(); for (k, v) in v { let key = k .as_str() diff --git a/src/pipeline/src/etl/value/map.rs b/src/pipeline/src/etl/value/map.rs index b406a69343..0c92a036c3 100644 --- a/src/pipeline/src/etl/value/map.rs +++ b/src/pipeline/src/etl/value/map.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; + use crate::etl::value::Value; -use crate::PipelineMap; #[derive(Debug, Clone, PartialEq, Default)] pub struct Map { - pub values: PipelineMap, + pub values: BTreeMap, } impl Map { @@ -36,14 +37,14 @@ impl Map { } } -impl From for Map { - fn from(values: PipelineMap) -> Self { +impl From> for Map { + fn from(values: BTreeMap) -> Self { Self { values } } } impl std::ops::Deref for Map { - type Target = PipelineMap; + type Target = BTreeMap; fn deref(&self) -> &Self::Target { &self.values diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 38248eb767..ab3649a196 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -27,8 +27,7 @@ pub use etl::transform::GreptimeTransformer; pub use etl::value::{Array, Map, Value}; pub use etl::{ json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map, - AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap, - TransformedOutput, + AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, TransformedOutput, }; pub use manager::{ pipeline_operator, table, util, IdentityTimeIndex, PipelineContext, PipelineDefinition, diff --git a/src/pipeline/src/tablesuffix.rs b/src/pipeline/src/tablesuffix.rs index 01f41fbf6e..3c51dad980 100644 --- a/src/pipeline/src/tablesuffix.rs +++ b/src/pipeline/src/tablesuffix.rs @@ -20,7 +20,7 @@ use yaml_rust::Yaml; use crate::error::{ Error, InvalidTableSuffixTemplateSnafu, RequiredTableSuffixTemplateSnafu, Result, }; -use crate::{PipelineMap, Value}; +use crate::Value; const REPLACE_KEY: &str = "{}"; @@ -47,7 +47,7 @@ pub(crate) struct TableSuffixTemplate { } impl TableSuffixTemplate { - pub fn apply(&self, val: &PipelineMap) -> Option { + pub fn apply(&self, val: &Value) -> Option { let values = self .keys .iter() diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 7db16bcda9..1686b71362 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::fmt::Display; use std::io::BufRead; use std::str::FromStr; @@ -34,10 +35,10 @@ use headers::ContentType; use lazy_static::lazy_static; use pipeline::util::to_pipeline_version; use pipeline::{ - ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, + ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value as PipelineValue, }; use serde::{Deserialize, Serialize}; -use serde_json::{json, Deserializer, Map, Value}; +use serde_json::{json, Deserializer, Map, Value as JsonValue}; use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use strum::{EnumIter, IntoEnumIterator}; @@ -106,7 +107,7 @@ pub(crate) struct PipelineIngestRequest { /// The table where the log data will be written to. pub table: String, /// The log data to be ingested. - pub values: Vec, + pub values: Vec, } pub struct PipelineContent(String); @@ -284,18 +285,18 @@ pub async fn delete_pipeline( /// Transform NDJSON array into a single array /// always return an array fn transform_ndjson_array_factory( - values: impl IntoIterator>, + values: impl IntoIterator>, ignore_error: bool, -) -> Result> { +) -> Result> { values .into_iter() .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item { Ok(item_value) => { match item_value { - Value::Array(item_array) => { + JsonValue::Array(item_array) => { acc_array.extend(item_array); } - Value::Object(_) => { + JsonValue::Object(_) => { acc_array.push(item_value); } _ => { @@ -320,7 +321,7 @@ fn transform_ndjson_array_factory( /// Dryrun pipeline with given data async fn dryrun_pipeline_inner( - value: Vec, + value: Vec, pipeline: Arc, pipeline_handler: PipelineHandlerRef, query_ctx: &QueryContextRef, @@ -356,24 +357,27 @@ async fn dryrun_pipeline_inner( .iter() .map(|cs| { let mut map = Map::new(); - map.insert(name_key.to_string(), Value::String(cs.column_name.clone())); + map.insert( + name_key.to_string(), + JsonValue::String(cs.column_name.clone()), + ); map.insert( data_type_key.to_string(), - Value::String(cs.datatype().as_str_name().to_string()), + JsonValue::String(cs.datatype().as_str_name().to_string()), ); map.insert( colume_type_key.to_string(), - Value::String(cs.semantic_type().as_str_name().to_string()), + JsonValue::String(cs.semantic_type().as_str_name().to_string()), ); map.insert( "fulltext".to_string(), - Value::Bool( + JsonValue::Bool( cs.options .clone() .is_some_and(|x| x.options.contains_key("fulltext")), ), ); - Value::Object(map) + JsonValue::Object(map) }) .collect::>(); @@ -401,26 +405,26 @@ async fn dryrun_pipeline_inner( "data_type".to_string(), schema[idx][data_type_key].clone(), ); - Value::Object(map) + JsonValue::Object(map) }) - .unwrap_or(Value::Null) + .unwrap_or(JsonValue::Null) }) .collect() }) .collect(); let mut result = Map::new(); - result.insert("schema".to_string(), Value::Array(schema)); - result.insert("rows".to_string(), Value::Array(rows)); - result.insert("table_name".to_string(), Value::String(table_name)); - let result = Value::Object(result); + result.insert("schema".to_string(), JsonValue::Array(schema)); + result.insert("rows".to_string(), JsonValue::Array(rows)); + result.insert("table_name".to_string(), JsonValue::String(table_name)); + let result = JsonValue::Object(result); Some(result) } else { None } }) .collect(); - Ok(Json(Value::Array(results)).into_response()) + Ok(Json(JsonValue::Array(results)).into_response()) } /// Dryrun pipeline with given data @@ -480,7 +484,7 @@ fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response /// Parse the data with given content type /// If the content type is invalid, return error /// content type is one of application/json, text/plain, application/x-ndjson -fn parse_dryrun_data(data_type: String, data: String) -> Result> { +fn parse_dryrun_data(data_type: String, data: String) -> Result> { if let Ok(content_type) = ContentType::from_str(&data_type) { extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false) } else { @@ -709,7 +713,7 @@ impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> { } impl EventPayloadResolver<'_> { - fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result> { + fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result> { match self.inner { EventPayloadResolverInner::Json => { pipeline::json_array_to_map(transform_ndjson_array_factory( @@ -754,9 +758,9 @@ impl EventPayloadResolver<'_> { .lines() .filter_map(|line| line.ok().filter(|line| !line.is_empty())) .map(|line| { - let mut map = PipelineMap::new(); - map.insert("message".to_string(), pipeline::Value::String(line)); - map + let mut map = BTreeMap::new(); + map.insert("message".to_string(), PipelineValue::String(line)); + PipelineValue::Map(map.into()) }) .collect::>(); Ok(result) @@ -769,7 +773,7 @@ fn extract_pipeline_value_by_content_type( content_type: ContentType, payload: Bytes, ignore_errors: bool, -) -> Result> { +) -> Result> { EventPayloadResolver::try_from(&content_type).and_then(|resolver| { resolver .parse_payload(payload, ignore_errors) @@ -878,28 +882,28 @@ mod tests { #[test] fn test_transform_ndjson() { let s = "{\"a\": 1}\n{\"b\": 2}"; - let a = Value::Array( + let a = JsonValue::Array( transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(), ) .to_string(); assert_eq!(a, "[{\"a\":1},{\"b\":2}]"); let s = "{\"a\": 1}"; - let a = Value::Array( + let a = JsonValue::Array( transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(), ) .to_string(); assert_eq!(a, "[{\"a\":1}]"); let s = "[{\"a\": 1}]"; - let a = Value::Array( + let a = JsonValue::Array( transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(), ) .to_string(); assert_eq!(a, "[{\"a\":1}]"); let s = "[{\"a\": 1}, {\"b\": 2}]"; - let a = Value::Array( + let a = JsonValue::Array( transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(), ) .to_string(); @@ -928,10 +932,12 @@ mod tests { extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true); assert!(fail_only_wrong.is_ok()); - let mut map1 = PipelineMap::new(); - map1.insert("a".to_string(), pipeline::Value::Uint64(1)); - let mut map2 = PipelineMap::new(); - map2.insert("c".to_string(), pipeline::Value::Uint64(1)); + let mut map1 = BTreeMap::new(); + map1.insert("a".to_string(), PipelineValue::Uint64(1)); + let map1 = PipelineValue::Map(map1.into()); + let mut map2 = BTreeMap::new(); + map2.insert("c".to_string(), PipelineValue::Uint64(1)); + let map2 = PipelineValue::Map(map2.into()); assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]); } } diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index 5a6e94723a..750924142a 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -23,7 +23,7 @@ use common_error::ext::ErrorExt; use common_query::Output; use datafusion_expr::LogicalPlan; use log_query::LogQuery; -use pipeline::PipelineMap; +use pipeline::Value; use query::parser::PromQuery; use session::context::QueryContextRef; use sql::statements::statement::Statement; @@ -385,9 +385,9 @@ pub trait LogIngestInterceptor { /// Called before pipeline execution. fn pre_pipeline( &self, - values: Vec, + values: Vec, _query_ctx: QueryContextRef, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { Ok(values) } @@ -412,9 +412,9 @@ where fn pre_pipeline( &self, - values: Vec, + values: Vec, query_ctx: QueryContextRef, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { if let Some(this) = self { this.pre_pipeline(values, query_ctx) } else { diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 5f3ff67aff..1a14441a0f 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use pipeline::error::AutoTransformOneTimestampSnafu; use pipeline::{ AutoTransformOutput, ContextReq, DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext, - PipelineDefinition, PipelineExecOutput, PipelineMap, TransformedOutput, + PipelineDefinition, PipelineExecOutput, TransformedOutput, Value, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; use session::context::{Channel, QueryContextRef}; @@ -116,7 +116,7 @@ async fn run_custom_pipeline( } = pipeline_req; let arr_len = pipeline_maps.len(); let mut transformed_map = HashMap::new(); - let mut dispatched: BTreeMap> = BTreeMap::new(); + let mut dispatched: BTreeMap> = BTreeMap::new(); let mut auto_map = HashMap::new(); let mut auto_map_ts_keys = HashMap::new(); diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 0a8ec5271b..c4bc924f0b 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -22,9 +22,7 @@ use api::v1::RowInsertRequest; use bytes::{Buf, Bytes}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_telemetry::debug; -use pipeline::{ - ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, Value, -}; +use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value}; use prost::encoding::message::merge; use prost::encoding::{decode_key, decode_varint, WireType}; use prost::DecodeError; @@ -338,7 +336,7 @@ impl PromWriteRequest { /// let's keep it that way for now. pub struct PromSeriesProcessor { pub(crate) use_pipeline: bool, - pub(crate) table_values: BTreeMap>, + pub(crate) table_values: BTreeMap>, // optional fields for pipeline pub(crate) pipeline_handler: Option, @@ -374,8 +372,8 @@ impl PromSeriesProcessor { &mut self, series: &mut PromTimeSeries, ) -> Result<(), DecodeError> { - let mut vec_pipeline_map: Vec = Vec::new(); - let mut pipeline_map = PipelineMap::new(); + let mut vec_pipeline_map: Vec = Vec::new(); + let mut pipeline_map = BTreeMap::new(); for l in series.labels.iter() { let name = String::from_utf8(l.name.to_vec()) .map_err(|_| DecodeError::new("invalid utf-8"))?; @@ -391,10 +389,10 @@ impl PromSeriesProcessor { pipeline_map.insert(GREPTIME_TIMESTAMP.to_string(), Value::Int64(timestamp)); pipeline_map.insert(GREPTIME_VALUE.to_string(), Value::Float64(s.value)); if one_sample { - vec_pipeline_map.push(pipeline_map); + vec_pipeline_map.push(Value::Map(pipeline_map.into())); break; } else { - vec_pipeline_map.push(pipeline_map.clone()); + vec_pipeline_map.push(Value::Map(pipeline_map.clone().into())); } }