refactor: remove PipelineMap and use Value instead (#6278)

* refactor: remove pipeline_map and use value instead

* chore: remove unused comments

* chore: move error to illegal state
This commit is contained in:
shuiyisong
2025-06-11 10:02:32 -07:00
committed by GitHub
parent 03bb6e4f28
commit 041b683a8d
32 changed files with 273 additions and 207 deletions

View File

@@ -21,7 +21,7 @@ use crate::error::{
ValueRequiredForDispatcherRuleSnafu,
};
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
use crate::{PipelineMap, Value};
use crate::Value;
const FIELD: &str = "field";
const PIPELINE: &str = "pipeline";
@@ -109,7 +109,7 @@ impl TryFrom<&Yaml> for Dispatcher {
impl Dispatcher {
/// execute dispatcher and returns matched rule if any
pub(crate) fn exec(&self, data: &PipelineMap) -> Option<&Rule> {
pub(crate) fn exec(&self, data: &Value) -> Option<&Rule> {
if let Some(value) = data.get(&self.field) {
for rule in &self.rules {
if rule.value == *value {
@@ -119,7 +119,7 @@ impl Dispatcher {
None
} else {
debug!("field {} not found in keys {:?}", &self.field, data.keys());
debug!("field {} not found in keys {:?}", &self.field, data);
None
}
}

View File

@@ -734,6 +734,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Top level value must be map"))]
ValueMustBeMap {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build DataFusion logical plan"))]
BuildDfLogicalPlan {
#[snafu(source)]
@@ -809,7 +815,7 @@ impl ErrorExt for Error {
PipelineNotFound { .. }
| InvalidPipelineVersion { .. }
| InvalidCustomTimeIndex { .. } => StatusCode::InvalidArguments,
MultiPipelineWithDiffSchema { .. } => StatusCode::IllegalState,
MultiPipelineWithDiffSchema { .. } | ValueMustBeMap { .. } => StatusCode::IllegalState,
BuildDfLogicalPlan { .. } | RecordBatchLenNotMatch { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
DataFrame { source, .. } => source.status_code(),

View File

@@ -19,6 +19,8 @@ pub mod processor;
pub mod transform;
pub mod value;
use std::collections::BTreeMap;
use ahash::{HashMap, HashMapExt};
use api::v1::Row;
use common_time::timestamp::TimeUnit;
@@ -31,7 +33,7 @@ use yaml_rust::YamlLoader;
use crate::dispatcher::{Dispatcher, Rule};
use crate::error::{
AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
YamlLoadSnafu, YamlParseSnafu,
ValueMustBeMapSnafu, YamlLoadSnafu, YamlParseSnafu,
};
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
use crate::etl::processor::ProcessorKind;
@@ -45,8 +47,6 @@ const TRANSFORMS: &str = "transforms";
const DISPATCHER: &str = "dispatcher";
const TABLESUFFIX: &str = "table_suffix";
pub type PipelineMap = std::collections::BTreeMap<String, Value>;
pub enum Content<'a> {
Json(&'a str),
Yaml(&'a str),
@@ -155,7 +155,7 @@ impl DispatchedTo {
pub enum PipelineExecOutput {
Transformed(TransformedOutput),
AutoTransform(AutoTransformOutput),
DispatchedTo(DispatchedTo, PipelineMap),
DispatchedTo(DispatchedTo, Value),
}
#[derive(Debug)]
@@ -163,7 +163,7 @@ pub struct TransformedOutput {
pub opt: ContextOpt,
pub row: Row,
pub table_suffix: Option<String>,
pub pipeline_map: PipelineMap,
pub pipeline_map: Value,
}
#[derive(Debug)]
@@ -171,7 +171,7 @@ pub struct AutoTransformOutput {
pub table_suffix: Option<String>,
// ts_column_name -> unit
pub ts_unit_map: HashMap<String, TimeUnit>,
pub pipeline_map: PipelineMap,
pub pipeline_map: Value,
}
impl PipelineExecOutput {
@@ -197,42 +197,42 @@ impl PipelineExecOutput {
}
}
pub fn json_to_map(val: serde_json::Value) -> Result<PipelineMap> {
pub fn json_to_map(val: serde_json::Value) -> Result<Value> {
match val {
serde_json::Value::Object(map) => {
let mut intermediate_state = PipelineMap::new();
let mut intermediate_state = BTreeMap::new();
for (k, v) in map {
intermediate_state.insert(k, Value::try_from(v)?);
}
Ok(intermediate_state)
Ok(Value::Map(intermediate_state.into()))
}
_ => InputValueMustBeObjectSnafu.fail(),
}
}
pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<PipelineMap>> {
pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<Value>> {
val.into_iter().map(json_to_map).collect()
}
pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<PipelineMap> {
pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<Value> {
match val {
simd_json::OwnedValue::Object(map) => {
let mut intermediate_state = PipelineMap::new();
let mut intermediate_state = BTreeMap::new();
for (k, v) in map.into_iter() {
intermediate_state.insert(k, Value::try_from(v)?);
}
Ok(intermediate_state)
Ok(Value::Map(intermediate_state.into()))
}
_ => InputValueMustBeObjectSnafu.fail(),
}
}
pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<PipelineMap>> {
pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<Value>> {
val.into_iter().map(simd_json_to_map).collect()
}
impl Pipeline {
pub fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineExecOutput> {
pub fn exec_mut(&self, mut val: Value) -> Result<PipelineExecOutput> {
// process
for processor in self.processors.iter() {
val = processor.exec_mut(val)?;
@@ -263,7 +263,7 @@ impl Pipeline {
let mut ts_unit_map = HashMap::with_capacity(4);
// get all ts values
for (k, v) in val.iter() {
for (k, v) in val.as_map_mut().context(ValueMustBeMapSnafu)? {
if let Value::Timestamp(ts) = v {
if !ts_unit_map.contains_key(k) {
ts_unit_map.insert(k.clone(), ts.get_unit());
@@ -378,8 +378,9 @@ transform:
type: timestamp, ns
index: time"#;
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
let mut payload = PipelineMap::new();
let mut payload = BTreeMap::new();
payload.insert("message".to_string(), Value::String(message));
let payload = Value::Map(payload.into());
let result = pipeline
.exec_mut(payload)
.unwrap()

View File

@@ -18,9 +18,11 @@ use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use session::context::{QueryContext, QueryContextRef};
use snafu::OptionExt;
use crate::error::{Result, ValueMustBeMapSnafu};
use crate::tablesuffix::TableSuffixTemplate;
use crate::PipelineMap;
use crate::Value;
const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table";
const GREPTIME_TTL: &str = "greptime_ttl";
@@ -71,7 +73,8 @@ pub struct ContextOpt {
}
impl ContextOpt {
pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> Self {
pub fn from_pipeline_map_to_opt(pipeline_map: &mut Value) -> Result<Self> {
let pipeline_map = pipeline_map.as_map_mut().context(ValueMustBeMapSnafu)?;
let mut opt = Self::default();
for k in PIPELINE_HINT_KEYS {
if let Some(v) = pipeline_map.remove(k) {
@@ -101,13 +104,13 @@ impl ContextOpt {
}
}
}
opt
Ok(opt)
}
pub(crate) fn resolve_table_suffix(
&mut self,
table_suffix: Option<&TableSuffixTemplate>,
pipeline_map: &PipelineMap,
pipeline_map: &Value,
) -> Option<String> {
self.table_suffix
.take()

View File

@@ -60,7 +60,7 @@ use crate::etl::processor::json_parse::JsonParseProcessor;
use crate::etl::processor::select::SelectProcessor;
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
use crate::etl::processor::vrl::VrlProcessor;
use crate::etl::PipelineMap;
use crate::Value;
const FIELD_NAME: &str = "field";
const FIELDS_NAME: &str = "fields";
@@ -125,7 +125,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
fn ignore_missing(&self) -> bool;
/// Execute the processor on a vector which be preprocessed by the pipeline
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap>;
fn exec_mut(&self, val: Value) -> Result<Value>;
}
#[derive(Debug)]

View File

@@ -16,6 +16,8 @@
//!
//! Refer to [`CmcdProcessor`] for more information.
use std::collections::BTreeMap;
use snafu::{OptionExt, ResultExt};
use urlencoding::decode;
@@ -30,7 +32,6 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
@@ -159,8 +160,8 @@ impl CmcdProcessor {
format!("{}_{}", prefix, key)
}
fn parse(&self, name: &str, value: &str) -> Result<PipelineMap> {
let mut working_set = PipelineMap::new();
fn parse(&self, name: &str, value: &str) -> Result<BTreeMap<String, Value>> {
let mut working_set = BTreeMap::new();
let parts = value.split(',');
@@ -249,14 +250,14 @@ impl Processor for CmcdProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let name = field.input_field();
match val.get(name) {
Some(Value::String(s)) => {
let results = self.parse(field.target_or_input_field(), s)?;
val.extend(results);
val.extend(results.into())?;
}
Some(Value::Null) | None => {
if !self.ignore_missing {
@@ -432,7 +433,7 @@ mod tests {
let expected = vec
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<PipelineMap>();
.collect::<BTreeMap<String, Value>>();
let actual = processor.parse("prefix", &decoded).unwrap();
assert_eq!(actual, expected);

View File

@@ -14,6 +14,8 @@
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html
use std::collections::BTreeMap;
use csv::{ReaderBuilder, Trim};
use itertools::EitherOrBoth::{Both, Left, Right};
use itertools::Itertools;
@@ -29,7 +31,6 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_CSV: &str = "csv";
@@ -59,7 +60,7 @@ pub struct CsvProcessor {
impl CsvProcessor {
// process the csv format string to a map with target_fields as keys
fn process(&self, val: &str) -> Result<PipelineMap> {
fn process(&self, val: &str) -> Result<BTreeMap<String, Value>> {
let mut reader = self.reader.from_reader(val.as_bytes());
if let Some(result) = reader.records().next() {
@@ -189,14 +190,14 @@ impl Processor for CsvProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let name = field.input_field();
match val.get(name) {
Some(Value::String(v)) => {
let results = self.process(v)?;
val.extend(results);
val.extend(results.into())?;
}
Some(Value::Null) | None => {
if !self.ignore_missing {
@@ -239,7 +240,7 @@ mod tests {
let result = processor.process("1,2").unwrap();
let values: PipelineMap = [
let values: BTreeMap<String, Value> = [
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
]
@@ -265,7 +266,7 @@ mod tests {
let result = processor.process("1,2").unwrap();
let values: PipelineMap = [
let values: BTreeMap<String, Value> = [
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
("c".into(), Value::Null),
@@ -290,7 +291,7 @@ mod tests {
let result = processor.process("1,2").unwrap();
let values: PipelineMap = [
let values: BTreeMap<String, Value> = [
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
("c".into(), Value::String("default".into())),
@@ -316,7 +317,7 @@ mod tests {
let result = processor.process("1,2").unwrap();
let values: PipelineMap = [
let values: BTreeMap<String, Value> = [
("a".into(), Value::String("1".into())),
("b".into(), Value::String("2".into())),
]

View File

@@ -30,7 +30,6 @@ use crate::etl::processor::{
FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_DATE: &str = "date";
@@ -198,14 +197,14 @@ impl Processor for DateProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(Value::String(s)) => {
let timestamp = self.parse(s)?;
let output_key = field.target_or_input_field();
val.insert(output_key.to_string(), Value::Timestamp(timestamp));
val.insert(output_key.to_string(), Value::Timestamp(timestamp))?;
}
Some(Value::Null) | None => {
if !self.ignore_missing {

View File

@@ -30,7 +30,6 @@ use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize";
@@ -102,7 +101,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -118,7 +117,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
Some(v) => {
let result = self.process(v)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), result);
val.insert(output_index.to_string(), result)?;
}
}
}

View File

@@ -33,7 +33,6 @@ use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_DIGEST: &str = "digest";
@@ -201,7 +200,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -217,7 +216,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
Some(v) => {
let result = self.process(v)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), result);
val.insert(output_index.to_string(), result)?;
}
}
}

View File

@@ -31,7 +31,6 @@ use crate::etl::processor::{
Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_DISSECT: &str = "dissect";
@@ -601,14 +600,14 @@ impl Processor for DissectProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(Value::String(val_str)) => {
let r = self.process(val_str)?;
for (k, v) in r {
val.insert(k, v);
val.insert(k, v)?;
}
}
Some(Value::Null) | None => {

View File

@@ -29,7 +29,6 @@ use crate::etl::value::time::{
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
const RESOLUTION_NAME: &str = "resolution";
@@ -167,7 +166,7 @@ impl Processor for EpochProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -183,7 +182,7 @@ impl Processor for EpochProcessor {
Some(v) => {
let timestamp = self.parse(v)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), Value::Timestamp(timestamp));
val.insert(output_index.to_string(), Value::Timestamp(timestamp))?;
}
}
}

View File

@@ -25,7 +25,6 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME, PATTERN_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_GSUB: &str = "gsub";
@@ -118,7 +117,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -134,7 +133,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
Some(v) => {
let result = self.process(v)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), result);
val.insert(output_index.to_string(), result)?;
}
}
}

View File

@@ -24,7 +24,6 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME, SEPARATOR_NAME,
};
use crate::etl::value::{Array, Value};
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_JOIN: &str = "join";
@@ -95,14 +94,14 @@ impl Processor for JoinProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(Value::Array(arr)) => {
let result = self.process(arr)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), result);
val.insert(output_index.to_string(), result)?;
}
Some(Value::Null) | None => {
if !self.ignore_missing {

View File

@@ -22,7 +22,7 @@ use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::{json_to_map, PipelineMap, Processor, Value};
use crate::{json_to_map, Processor, Value};
pub(crate) const PROCESSOR_JSON_PARSE: &str = "json_parse";
@@ -77,7 +77,7 @@ impl JsonParseProcessor {
};
let parsed: serde_json::Value = serde_json::from_str(json_str).context(JsonParseSnafu)?;
match parsed {
serde_json::Value::Object(_) => Ok(Value::Map(json_to_map(parsed)?.into())),
serde_json::Value::Object(_) => Ok(json_to_map(parsed)?),
serde_json::Value::Array(arr) => Ok(Value::Array(arr.try_into()?)),
_ => ProcessorUnsupportedValueSnafu {
processor: self.kind(),
@@ -97,14 +97,14 @@ impl Processor for JsonParseProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(v) => {
let processed = self.process_field(v)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), processed);
val.insert(output_index.to_string(), processed)?;
}
None => {
if !self.ignore_missing {

View File

@@ -21,8 +21,8 @@ use crate::error::{
};
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
};
use crate::Value;
@@ -125,14 +125,14 @@ impl Processor for JsonPathProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(v) => {
let processed = self.process_field(v)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), processed);
val.insert(output_index.to_string(), processed)?;
}
None => {
if !self.ignore_missing {

View File

@@ -24,7 +24,6 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME, METHOD_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_LETTER: &str = "letter";
@@ -126,14 +125,14 @@ impl Processor for LetterProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(Value::String(s)) => {
let result = self.process_field(s)?;
let output_key = field.target_or_input_field();
val.insert(output_key.to_string(), result);
val.insert(output_key.to_string(), result)?;
}
Some(Value::Null) | None => {
if !self.ignore_missing {

View File

@@ -18,6 +18,8 @@ const PATTERNS_NAME: &str = "patterns";
pub(crate) const PROCESSOR_REGEX: &str = "regex";
use std::collections::BTreeMap;
use lazy_static::lazy_static;
use regex::Regex;
use snafu::{OptionExt, ResultExt};
@@ -33,7 +35,6 @@ use crate::etl::processor::{
FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
lazy_static! {
static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap();
@@ -167,8 +168,8 @@ impl RegexProcessor {
Ok(())
}
fn process(&self, prefix: &str, val: &str) -> Result<PipelineMap> {
let mut result = PipelineMap::new();
fn process(&self, prefix: &str, val: &str) -> Result<BTreeMap<String, Value>> {
let mut result = BTreeMap::new();
for gr in self.patterns.iter() {
if let Some(captures) = gr.regex.captures(val) {
for group in gr.groups.iter() {
@@ -192,14 +193,14 @@ impl Processor for RegexProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
let prefix = field.target_or_input_field();
match val.get(index) {
Some(Value::String(s)) => {
let result = self.process(prefix, s)?;
val.extend(result);
val.extend(result.into())?;
}
Some(Value::Null) | None => {
if !self.ignore_missing {
@@ -269,7 +270,7 @@ ignore_missing: false"#;
let cw = "[c=w,n=US_CA_SANJOSE,o=55155]";
let breadcrumbs_str = [cc, cg, co, cp, cw].iter().join(",");
let temporary_map: PipelineMap = [
let temporary_map: BTreeMap<String, Value> = [
("breadcrumbs_parent", Value::String(cc.to_string())),
("breadcrumbs_edge", Value::String(cg.to_string())),
("breadcrumbs_origin", Value::String(co.to_string())),

View File

@@ -15,12 +15,14 @@
use ahash::{HashSet, HashSetExt};
use snafu::OptionExt;
use crate::error::{Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result};
use crate::error::{
Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result, ValueMustBeMapSnafu,
};
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME,
};
use crate::{PipelineMap, Processor};
use crate::{Processor, Value};
pub(crate) const PROCESSOR_SELECT: &str = "select";
const INCLUDE_KEY: &str = "include";
@@ -96,27 +98,29 @@ impl Processor for SelectProcessor {
true
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
let v_map = val.as_map_mut().context(ValueMustBeMapSnafu)?;
match self.select_type {
SelectType::Include => {
let mut include_key_set = HashSet::with_capacity(val.len());
let mut include_key_set = HashSet::with_capacity(v_map.len());
for field in self.fields.iter() {
// If the field has a target, move the value to the target
let field_name = field.input_field();
if let Some(target_name) = field.target_field() {
if let Some(v) = val.remove(field_name) {
val.insert(target_name.to_string(), v);
if let Some(v) = v_map.remove(field_name) {
v_map.insert(target_name.to_string(), v);
}
include_key_set.insert(target_name);
} else {
include_key_set.insert(field_name);
}
}
val.retain(|k, _| include_key_set.contains(k.as_str()));
v_map.retain(|k, _| include_key_set.contains(k.as_str()));
}
SelectType::Exclude => {
for field in self.fields.iter() {
val.remove(field.input_field());
v_map.remove(field.input_field());
}
}
}
@@ -127,9 +131,11 @@ impl Processor for SelectProcessor {
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::select::{SelectProcessor, SelectType};
use crate::{PipelineMap, Processor, Value};
use crate::{Map, Processor, Value};
#[test]
fn test_select() {
@@ -138,13 +144,14 @@ mod test {
select_type: SelectType::Include,
};
let mut p = PipelineMap::new();
let mut p = BTreeMap::new();
p.insert("hello".to_string(), Value::String("world".to_string()));
p.insert("hello2".to_string(), Value::String("world2".to_string()));
let result = processor.exec_mut(p);
let result = processor.exec_mut(Value::Map(Map { values: p }));
assert!(result.is_ok());
let p = result.unwrap();
let mut result = result.unwrap();
let p = result.as_map_mut().unwrap();
assert_eq!(p.len(), 1);
assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
}
@@ -156,13 +163,14 @@ mod test {
select_type: SelectType::Include,
};
let mut p = PipelineMap::new();
let mut p = BTreeMap::new();
p.insert("hello".to_string(), Value::String("world".to_string()));
p.insert("hello2".to_string(), Value::String("world2".to_string()));
let result = processor.exec_mut(p);
let result = processor.exec_mut(Value::Map(Map { values: p }));
assert!(result.is_ok());
let p = result.unwrap();
let mut result = result.unwrap();
let p = result.as_map_mut().unwrap();
assert_eq!(p.len(), 1);
assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
}
@@ -174,13 +182,14 @@ mod test {
select_type: SelectType::Exclude,
};
let mut p = PipelineMap::new();
let mut p = BTreeMap::new();
p.insert("hello".to_string(), Value::String("world".to_string()));
p.insert("hello2".to_string(), Value::String("world2".to_string()));
let result = processor.exec_mut(p);
let result = processor.exec_mut(Value::Map(Map { values: p }));
assert!(result.is_ok());
let p = result.unwrap();
let mut result = result.unwrap();
let p = result.as_map_mut().unwrap();
assert_eq!(p.len(), 1);
assert_eq!(p.get("hello"), None);
assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));

View File

@@ -20,7 +20,7 @@ use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, KEY_NAME,
};
use crate::{PipelineMap, Processor, Value};
use crate::{Processor, Value};
pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
@@ -98,14 +98,14 @@ impl Processor for SimpleExtractProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(v) => {
let processed = self.process_field(v)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), processed);
val.insert(output_index.to_string(), processed)?;
}
None => {
if !self.ignore_missing {

View File

@@ -36,7 +36,6 @@ use crate::etl::value::time::{
SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp";
const RESOLUTION_NAME: &str = "resolution";
@@ -302,7 +301,7 @@ impl Processor for TimestampProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
@@ -318,7 +317,7 @@ impl Processor for TimestampProcessor {
Some(v) => {
let result = self.parse(v)?;
let output_key = field.target_or_input_field();
val.insert(output_key.to_string(), Value::Timestamp(result));
val.insert(output_key.to_string(), Value::Timestamp(result))?;
}
}
}

View File

@@ -25,7 +25,6 @@ use crate::etl::processor::{
IGNORE_MISSING_NAME, METHOD_NAME,
};
use crate::etl::value::Value;
use crate::PipelineMap;
pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding";
@@ -126,14 +125,14 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
self.ignore_missing
}
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, mut val: Value) -> Result<Value> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(Value::String(s)) => {
let result = self.process_field(s)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), result);
val.insert(output_index.to_string(), result)?;
}
Some(Value::Null) | None => {
if !self.ignore_missing {

View File

@@ -27,7 +27,7 @@ use crate::error::{
InvalidTimestampSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu, VrlReturnValueSnafu,
};
use crate::etl::processor::yaml_string;
use crate::{PipelineMap, Value as PipelineValue};
use crate::Value as PipelineValue;
pub(crate) const PROCESSOR_VRL: &str = "vrl";
const SOURCE: &str = "source";
@@ -62,14 +62,11 @@ impl VrlProcessor {
Ok(Self { source, program })
}
pub fn resolve(&self, m: PipelineMap) -> Result<PipelineValue> {
let pipeline_vrl = m
.into_iter()
.map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
.collect::<Result<BTreeMap<_, _>>>()?;
pub fn resolve(&self, m: PipelineValue) -> Result<PipelineValue> {
let pipeline_vrl = pipeline_value_to_vrl_value(m)?;
let mut target = TargetValue {
value: VrlValue::Object(pipeline_vrl),
value: pipeline_vrl,
metadata: VrlValue::Object(BTreeMap::new()),
secrets: Secrets::default(),
};
@@ -116,11 +113,11 @@ impl crate::etl::processor::Processor for VrlProcessor {
true
}
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap> {
fn exec_mut(&self, val: PipelineValue) -> Result<PipelineValue> {
let val = self.resolve(val)?;
if let PipelineValue::Map(m) = val {
Ok(m.values)
Ok(PipelineValue::Map(m.values.into()))
} else {
VrlRegexValueSnafu.fail()
}
@@ -244,19 +241,19 @@ del(.user_info)
assert!(v.is_ok());
let v = v.unwrap();
let mut n = PipelineMap::new();
let mut n = BTreeMap::new();
n.insert(
"name".to_string(),
PipelineValue::String("certain_name".to_string()),
);
let mut m = PipelineMap::new();
let mut m = BTreeMap::new();
m.insert(
"user_info".to_string(),
PipelineValue::Map(Map { values: n }),
);
let re = v.resolve(m);
let re = v.resolve(PipelineValue::Map(Map { values: m }));
assert!(re.is_ok());
let re = re.unwrap();

View File

@@ -14,7 +14,7 @@
pub mod coerce;
use std::collections::HashSet;
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
@@ -29,19 +29,19 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use serde_json::Number;
use session::context::Channel;
use snafu::OptionExt;
use crate::error::{
IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu, ValueMustBeMapSnafu,
};
use crate::etl::ctx_req::ContextOpt;
use crate::etl::field::{Field, Fields};
use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transforms};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
use crate::PipelineContext;
use crate::{Map, PipelineContext};
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
@@ -186,8 +186,8 @@ impl GreptimeTransformer {
}
}
pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(ContextOpt, Row)> {
let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map);
pub fn transform_mut(&self, pipeline_map: &mut Value) -> Result<(ContextOpt, Row)> {
let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map)?;
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
let mut output_index = 0;
@@ -337,7 +337,7 @@ fn resolve_number_schema(
)
}
fn calc_ts(p_ctx: &PipelineContext, values: &PipelineMap) -> Result<Option<ValueData>> {
fn calc_ts(p_ctx: &PipelineContext, values: &Value) -> Result<Option<ValueData>> {
match p_ctx.channel {
Channel::Prometheus => Ok(Some(ValueData::TimestampMillisecondValue(
values
@@ -362,7 +362,7 @@ fn calc_ts(p_ctx: &PipelineContext, values: &PipelineMap) -> Result<Option<Value
fn values_to_row(
schema_info: &mut SchemaInfo,
values: PipelineMap,
values: Value,
pipeline_ctx: &PipelineContext<'_>,
) -> Result<Row> {
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
@@ -382,6 +382,8 @@ fn values_to_row(
.as_ref()
.map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
let values = values.into_map().context(ValueMustBeMapSnafu)?;
for (column_name, value) in values {
if column_name == ts_column_name {
continue;
@@ -518,7 +520,7 @@ fn resolve_value(
}
fn identity_pipeline_inner(
pipeline_maps: Vec<PipelineMap>,
pipeline_maps: Vec<Value>,
pipeline_ctx: &PipelineContext<'_>,
) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
let mut schema_info = SchemaInfo::default();
@@ -545,7 +547,7 @@ fn identity_pipeline_inner(
let len = pipeline_maps.len();
for mut pipeline_map in pipeline_maps {
let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map);
let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map)?;
let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?;
opt_map
@@ -576,7 +578,7 @@ fn identity_pipeline_inner(
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(
array: Vec<PipelineMap>,
array: Vec<Value>,
table: Option<Arc<table::Table>>,
pipeline_ctx: &PipelineContext<'_>,
) -> Result<HashMap<ContextOpt, Rows>> {
@@ -584,7 +586,7 @@ pub fn identity_pipeline(
array
.into_iter()
.map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING))
.collect::<Result<Vec<PipelineMap>>>()?
.collect::<Result<Vec<Value>>>()?
} else {
array
};
@@ -618,21 +620,22 @@ pub fn identity_pipeline(
///
/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
pub fn flatten_object(object: PipelineMap, max_nested_levels: usize) -> Result<PipelineMap> {
let mut flattened = PipelineMap::new();
pub fn flatten_object(object: Value, max_nested_levels: usize) -> Result<Value> {
let mut flattened = BTreeMap::new();
let object = object.into_map().context(ValueMustBeMapSnafu)?;
if !object.is_empty() {
// it will use recursion to flatten the object.
do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
}
Ok(flattened)
Ok(Value::Map(Map { values: flattened }))
}
fn do_flatten_object(
dest: &mut PipelineMap,
dest: &mut BTreeMap<String, Value>,
base: Option<&str>,
object: PipelineMap,
object: BTreeMap<String, Value>,
current_level: usize,
max_nested_levels: usize,
) -> Result<()> {

View File

@@ -16,6 +16,7 @@ pub mod array;
pub mod map;
pub mod time;
use std::collections::BTreeMap;
use std::result::Result as StdResult;
pub use array::Array;
@@ -30,15 +31,16 @@ pub use time::Timestamp;
use crate::error::{
Error, Result, UnsupportedNumberTypeSnafu, ValueDefaultValueUnsupportedSnafu,
ValueInvalidResolutionSnafu, ValueParseBooleanSnafu, ValueParseFloatSnafu, ValueParseIntSnafu,
ValueParseTypeSnafu, ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
ValueInvalidResolutionSnafu, ValueMustBeMapSnafu, ValueParseBooleanSnafu, ValueParseFloatSnafu,
ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedYamlTypeSnafu,
ValueYamlKeyMustBeStringSnafu,
};
use crate::etl::PipelineMap;
pub type PipelineMap = Value;
/// Value can be used as type
/// acts as value: the enclosed value is the actual value
/// acts as type: the enclosed value is the default value
#[derive(Debug, Clone, PartialEq, Default)]
pub enum Value {
// as value: null
@@ -70,6 +72,47 @@ pub enum Value {
}
impl Value {
pub fn get(&self, key: &str) -> Option<&Self> {
match self {
Value::Map(map) => map.get(key),
_ => None,
}
}
pub fn get_mut(&mut self, key: &str) -> Option<&mut Self> {
match self {
Value::Map(map) => map.get_mut(key),
_ => None,
}
}
pub fn remove(&mut self, key: &str) -> Option<Value> {
match self {
Value::Map(map) => map.remove(key),
_ => None,
}
}
pub fn extend(&mut self, other: Map) -> Result<()> {
match self {
Value::Map(map) => {
map.extend(other);
Ok(())
}
_ => ValueMustBeMapSnafu.fail(),
}
}
pub fn insert(&mut self, key: String, value: Value) -> Result<()> {
match self {
Value::Map(map) => {
map.insert(key, value);
Ok(())
}
_ => ValueMustBeMapSnafu.fail(),
}
}
pub fn is_null(&self) -> bool {
matches!(self, Value::Null)
}
@@ -236,13 +279,6 @@ impl Value {
}
}
pub fn get(&self, key: &str) -> Option<&Self> {
match self {
Value::Map(map) => map.get(key),
_ => None,
}
}
pub fn as_str(&self) -> Option<&str> {
match self {
Value::String(v) => Some(v),
@@ -289,6 +325,20 @@ impl Value {
}
}
pub fn as_map_mut(&mut self) -> Option<&mut BTreeMap<String, Self>> {
match self {
Value::Map(map) => Some(map),
_ => None,
}
}
pub fn into_map(self) -> Option<BTreeMap<String, Self>> {
match self {
Value::Map(map) => Some(map.values),
_ => None,
}
}
// ref https://github.com/serde-rs/json/blob/master/src/value/mod.rs#L779
pub fn pointer(&self, pointer: &str) -> Option<&Value> {
if pointer.is_empty() {
@@ -388,7 +438,7 @@ impl TryFrom<simd_json::value::OwnedValue> for Value {
Ok(Value::Array(Array { values: re }))
}
simd_json::OwnedValue::Object(map) => {
let mut values = PipelineMap::new();
let mut values = BTreeMap::new();
for (k, v) in map.into_iter() {
values.insert(k, Value::try_from(v)?);
}
@@ -425,7 +475,7 @@ impl TryFrom<serde_json::Value> for Value {
Ok(Value::Array(Array { values }))
}
serde_json::Value::Object(v) => {
let mut values = PipelineMap::new();
let mut values = BTreeMap::new();
for (k, v) in v {
values.insert(k, Value::try_from(v)?);
}
@@ -456,7 +506,7 @@ impl TryFrom<&yaml_rust::Yaml> for Value {
Ok(Value::Array(Array { values }))
}
yaml_rust::Yaml::Hash(v) => {
let mut values = PipelineMap::new();
let mut values = BTreeMap::new();
for (k, v) in v {
let key = k
.as_str()

View File

@@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use crate::etl::value::Value;
use crate::PipelineMap;
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Map {
pub values: PipelineMap,
pub values: BTreeMap<String, Value>,
}
impl Map {
@@ -36,14 +37,14 @@ impl Map {
}
}
impl From<PipelineMap> for Map {
fn from(values: PipelineMap) -> Self {
impl From<BTreeMap<String, Value>> for Map {
fn from(values: BTreeMap<String, Value>) -> Self {
Self { values }
}
}
impl std::ops::Deref for Map {
type Target = PipelineMap;
type Target = BTreeMap<String, Value>;
fn deref(&self) -> &Self::Target {
&self.values

View File

@@ -27,8 +27,7 @@ pub use etl::transform::GreptimeTransformer;
pub use etl::value::{Array, Map, Value};
pub use etl::{
json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map,
AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
TransformedOutput,
AutoTransformOutput, Content, DispatchedTo, Pipeline, PipelineExecOutput, TransformedOutput,
};
pub use manager::{
pipeline_operator, table, util, IdentityTimeIndex, PipelineContext, PipelineDefinition,

View File

@@ -20,7 +20,7 @@ use yaml_rust::Yaml;
use crate::error::{
Error, InvalidTableSuffixTemplateSnafu, RequiredTableSuffixTemplateSnafu, Result,
};
use crate::{PipelineMap, Value};
use crate::Value;
const REPLACE_KEY: &str = "{}";
@@ -47,7 +47,7 @@ pub(crate) struct TableSuffixTemplate {
}
impl TableSuffixTemplate {
pub fn apply(&self, val: &PipelineMap) -> Option<String> {
pub fn apply(&self, val: &Value) -> Option<String> {
let values = self
.keys
.iter()

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::fmt::Display;
use std::io::BufRead;
use std::str::FromStr;
@@ -34,10 +35,10 @@ use headers::ContentType;
use lazy_static::lazy_static;
use pipeline::util::to_pipeline_version;
use pipeline::{
ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap,
ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value as PipelineValue,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Deserializer, Map, Value};
use serde_json::{json, Deserializer, Map, Value as JsonValue};
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use strum::{EnumIter, IntoEnumIterator};
@@ -106,7 +107,7 @@ pub(crate) struct PipelineIngestRequest {
/// The table where the log data will be written to.
pub table: String,
/// The log data to be ingested.
pub values: Vec<PipelineMap>,
pub values: Vec<PipelineValue>,
}
pub struct PipelineContent(String);
@@ -284,18 +285,18 @@ pub async fn delete_pipeline(
/// Transform NDJSON array into a single array
/// always return an array
fn transform_ndjson_array_factory(
values: impl IntoIterator<Item = Result<Value, serde_json::Error>>,
values: impl IntoIterator<Item = Result<JsonValue, serde_json::Error>>,
ignore_error: bool,
) -> Result<Vec<Value>> {
) -> Result<Vec<JsonValue>> {
values
.into_iter()
.try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
Ok(item_value) => {
match item_value {
Value::Array(item_array) => {
JsonValue::Array(item_array) => {
acc_array.extend(item_array);
}
Value::Object(_) => {
JsonValue::Object(_) => {
acc_array.push(item_value);
}
_ => {
@@ -320,7 +321,7 @@ fn transform_ndjson_array_factory(
/// Dryrun pipeline with given data
async fn dryrun_pipeline_inner(
value: Vec<PipelineMap>,
value: Vec<PipelineValue>,
pipeline: Arc<pipeline::Pipeline>,
pipeline_handler: PipelineHandlerRef,
query_ctx: &QueryContextRef,
@@ -356,24 +357,27 @@ async fn dryrun_pipeline_inner(
.iter()
.map(|cs| {
let mut map = Map::new();
map.insert(name_key.to_string(), Value::String(cs.column_name.clone()));
map.insert(
name_key.to_string(),
JsonValue::String(cs.column_name.clone()),
);
map.insert(
data_type_key.to_string(),
Value::String(cs.datatype().as_str_name().to_string()),
JsonValue::String(cs.datatype().as_str_name().to_string()),
);
map.insert(
colume_type_key.to_string(),
Value::String(cs.semantic_type().as_str_name().to_string()),
JsonValue::String(cs.semantic_type().as_str_name().to_string()),
);
map.insert(
"fulltext".to_string(),
Value::Bool(
JsonValue::Bool(
cs.options
.clone()
.is_some_and(|x| x.options.contains_key("fulltext")),
),
);
Value::Object(map)
JsonValue::Object(map)
})
.collect::<Vec<_>>();
@@ -401,26 +405,26 @@ async fn dryrun_pipeline_inner(
"data_type".to_string(),
schema[idx][data_type_key].clone(),
);
Value::Object(map)
JsonValue::Object(map)
})
.unwrap_or(Value::Null)
.unwrap_or(JsonValue::Null)
})
.collect()
})
.collect();
let mut result = Map::new();
result.insert("schema".to_string(), Value::Array(schema));
result.insert("rows".to_string(), Value::Array(rows));
result.insert("table_name".to_string(), Value::String(table_name));
let result = Value::Object(result);
result.insert("schema".to_string(), JsonValue::Array(schema));
result.insert("rows".to_string(), JsonValue::Array(rows));
result.insert("table_name".to_string(), JsonValue::String(table_name));
let result = JsonValue::Object(result);
Some(result)
} else {
None
}
})
.collect();
Ok(Json(Value::Array(results)).into_response())
Ok(Json(JsonValue::Array(results)).into_response())
}
/// Dryrun pipeline with given data
@@ -480,7 +484,7 @@ fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response
/// Parse the data with given content type
/// If the content type is invalid, return error
/// content type is one of application/json, text/plain, application/x-ndjson
fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<PipelineMap>> {
fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<PipelineValue>> {
if let Ok(content_type) = ContentType::from_str(&data_type) {
extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
} else {
@@ -709,7 +713,7 @@ impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
}
impl EventPayloadResolver<'_> {
fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<PipelineMap>> {
fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<PipelineValue>> {
match self.inner {
EventPayloadResolverInner::Json => {
pipeline::json_array_to_map(transform_ndjson_array_factory(
@@ -754,9 +758,9 @@ impl EventPayloadResolver<'_> {
.lines()
.filter_map(|line| line.ok().filter(|line| !line.is_empty()))
.map(|line| {
let mut map = PipelineMap::new();
map.insert("message".to_string(), pipeline::Value::String(line));
map
let mut map = BTreeMap::new();
map.insert("message".to_string(), PipelineValue::String(line));
PipelineValue::Map(map.into())
})
.collect::<Vec<_>>();
Ok(result)
@@ -769,7 +773,7 @@ fn extract_pipeline_value_by_content_type(
content_type: ContentType,
payload: Bytes,
ignore_errors: bool,
) -> Result<Vec<PipelineMap>> {
) -> Result<Vec<PipelineValue>> {
EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
resolver
.parse_payload(payload, ignore_errors)
@@ -878,28 +882,28 @@ mod tests {
#[test]
fn test_transform_ndjson() {
let s = "{\"a\": 1}\n{\"b\": 2}";
let a = Value::Array(
let a = JsonValue::Array(
transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
)
.to_string();
assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
let s = "{\"a\": 1}";
let a = Value::Array(
let a = JsonValue::Array(
transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
)
.to_string();
assert_eq!(a, "[{\"a\":1}]");
let s = "[{\"a\": 1}]";
let a = Value::Array(
let a = JsonValue::Array(
transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
)
.to_string();
assert_eq!(a, "[{\"a\":1}]");
let s = "[{\"a\": 1}, {\"b\": 2}]";
let a = Value::Array(
let a = JsonValue::Array(
transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
)
.to_string();
@@ -928,10 +932,12 @@ mod tests {
extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
assert!(fail_only_wrong.is_ok());
let mut map1 = PipelineMap::new();
map1.insert("a".to_string(), pipeline::Value::Uint64(1));
let mut map2 = PipelineMap::new();
map2.insert("c".to_string(), pipeline::Value::Uint64(1));
let mut map1 = BTreeMap::new();
map1.insert("a".to_string(), PipelineValue::Uint64(1));
let map1 = PipelineValue::Map(map1.into());
let mut map2 = BTreeMap::new();
map2.insert("c".to_string(), PipelineValue::Uint64(1));
let map2 = PipelineValue::Map(map2.into());
assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
}
}

View File

@@ -23,7 +23,7 @@ use common_error::ext::ErrorExt;
use common_query::Output;
use datafusion_expr::LogicalPlan;
use log_query::LogQuery;
use pipeline::PipelineMap;
use pipeline::Value;
use query::parser::PromQuery;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
@@ -385,9 +385,9 @@ pub trait LogIngestInterceptor {
/// Called before pipeline execution.
fn pre_pipeline(
&self,
values: Vec<PipelineMap>,
values: Vec<Value>,
_query_ctx: QueryContextRef,
) -> Result<Vec<PipelineMap>, Self::Error> {
) -> Result<Vec<Value>, Self::Error> {
Ok(values)
}
@@ -412,9 +412,9 @@ where
fn pre_pipeline(
&self,
values: Vec<PipelineMap>,
values: Vec<Value>,
query_ctx: QueryContextRef,
) -> Result<Vec<PipelineMap>, Self::Error> {
) -> Result<Vec<Value>, Self::Error> {
if let Some(this) = self {
this.pre_pipeline(values, query_ctx)
} else {

View File

@@ -21,7 +21,7 @@ use itertools::Itertools;
use pipeline::error::AutoTransformOneTimestampSnafu;
use pipeline::{
AutoTransformOutput, ContextReq, DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext,
PipelineDefinition, PipelineExecOutput, PipelineMap, TransformedOutput,
PipelineDefinition, PipelineExecOutput, TransformedOutput, Value,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};
use session::context::{Channel, QueryContextRef};
@@ -116,7 +116,7 @@ async fn run_custom_pipeline(
} = pipeline_req;
let arr_len = pipeline_maps.len();
let mut transformed_map = HashMap::new();
let mut dispatched: BTreeMap<DispatchedTo, Vec<PipelineMap>> = BTreeMap::new();
let mut dispatched: BTreeMap<DispatchedTo, Vec<Value>> = BTreeMap::new();
let mut auto_map = HashMap::new();
let mut auto_map_ts_keys = HashMap::new();

View File

@@ -22,9 +22,7 @@ use api::v1::RowInsertRequest;
use bytes::{Buf, Bytes};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::debug;
use pipeline::{
ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, Value,
};
use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value};
use prost::encoding::message::merge;
use prost::encoding::{decode_key, decode_varint, WireType};
use prost::DecodeError;
@@ -338,7 +336,7 @@ impl PromWriteRequest {
/// let's keep it that way for now.
pub struct PromSeriesProcessor {
pub(crate) use_pipeline: bool,
pub(crate) table_values: BTreeMap<String, Vec<PipelineMap>>,
pub(crate) table_values: BTreeMap<String, Vec<Value>>,
// optional fields for pipeline
pub(crate) pipeline_handler: Option<PipelineHandlerRef>,
@@ -374,8 +372,8 @@ impl PromSeriesProcessor {
&mut self,
series: &mut PromTimeSeries,
) -> Result<(), DecodeError> {
let mut vec_pipeline_map: Vec<PipelineMap> = Vec::new();
let mut pipeline_map = PipelineMap::new();
let mut vec_pipeline_map: Vec<Value> = Vec::new();
let mut pipeline_map = BTreeMap::new();
for l in series.labels.iter() {
let name = String::from_utf8(l.name.to_vec())
.map_err(|_| DecodeError::new("invalid utf-8"))?;
@@ -391,10 +389,10 @@ impl PromSeriesProcessor {
pipeline_map.insert(GREPTIME_TIMESTAMP.to_string(), Value::Int64(timestamp));
pipeline_map.insert(GREPTIME_VALUE.to_string(), Value::Float64(s.value));
if one_sample {
vec_pipeline_map.push(pipeline_map);
vec_pipeline_map.push(Value::Map(pipeline_map.into()));
break;
} else {
vec_pipeline_map.push(pipeline_map.clone());
vec_pipeline_map.push(Value::Map(pipeline_map.clone().into()));
}
}