mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 08:50:40 +00:00
chore: add json path for pipeline (#4925)
* chore: add json path for pipeline * chore: change jsonpath lib verion * chore: remove useless doc * chore: fix json path test * chore: fix pipeline json path test
This commit is contained in:
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -5661,6 +5661,19 @@ dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonpath-rust"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69a61b87f6a55cc6c28fed5739dd36b9642321ce63e4a5e4a4715d69106f4a10"
|
||||
dependencies = [
|
||||
"pest",
|
||||
"pest_derive",
|
||||
"regex",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonptr"
|
||||
version = "0.4.7"
|
||||
@@ -5771,7 +5784,7 @@ dependencies = [
|
||||
"hyper-rustls",
|
||||
"hyper-timeout 0.5.1",
|
||||
"hyper-util",
|
||||
"jsonpath-rust",
|
||||
"jsonpath-rust 0.5.1",
|
||||
"k8s-openapi",
|
||||
"kube-core",
|
||||
"pem 3.0.4",
|
||||
@@ -8310,6 +8323,7 @@ dependencies = [
|
||||
"greptime-proto",
|
||||
"itertools 0.10.5",
|
||||
"jsonb",
|
||||
"jsonpath-rust 0.7.3",
|
||||
"lazy_static",
|
||||
"moka",
|
||||
"once_cell",
|
||||
|
||||
@@ -41,6 +41,7 @@ futures.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonb.workspace = true
|
||||
jsonpath-rust = "0.7.3"
|
||||
lazy_static.workspace = true
|
||||
moka = { workspace = true, features = ["sync"] }
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -570,6 +570,18 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Parse json path error"))]
|
||||
JsonPathParse {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: jsonpath_rust::JsonPathParserError,
|
||||
},
|
||||
#[snafu(display("Json path result index not number"))]
|
||||
JsonPathParseResultIndex {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
@@ -19,6 +19,7 @@ pub mod dissect;
|
||||
pub mod epoch;
|
||||
pub mod gsub;
|
||||
pub mod join;
|
||||
pub mod json_path;
|
||||
pub mod letter;
|
||||
pub mod regex;
|
||||
pub mod timestamp;
|
||||
@@ -34,6 +35,7 @@ use epoch::{EpochProcessor, EpochProcessorBuilder};
|
||||
use gsub::{GsubProcessor, GsubProcessorBuilder};
|
||||
use itertools::Itertools;
|
||||
use join::{JoinProcessor, JoinProcessorBuilder};
|
||||
use json_path::{JsonPathProcessor, JsonPathProcessorBuilder};
|
||||
use letter::{LetterProcessor, LetterProcessorBuilder};
|
||||
use regex::{RegexProcessor, RegexProcessorBuilder};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -56,6 +58,8 @@ const PATTERN_NAME: &str = "pattern";
|
||||
const PATTERNS_NAME: &str = "patterns";
|
||||
const SEPARATOR_NAME: &str = "separator";
|
||||
const TARGET_FIELDS_NAME: &str = "target_fields";
|
||||
const JSON_PATH_NAME: &str = "json_path";
|
||||
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
|
||||
|
||||
// const IF_NAME: &str = "if";
|
||||
// const IGNORE_FAILURE_NAME: &str = "ignore_failure";
|
||||
@@ -94,6 +98,7 @@ pub enum ProcessorKind {
|
||||
UrlEncoding(UrlEncodingProcessor),
|
||||
Epoch(EpochProcessor),
|
||||
Date(DateProcessor),
|
||||
JsonPath(JsonPathProcessor),
|
||||
}
|
||||
|
||||
/// ProcessorBuilder trait defines the interface for all processor builders
|
||||
@@ -122,6 +127,7 @@ pub enum ProcessorBuilders {
|
||||
UrlEncoding(UrlEncodingProcessorBuilder),
|
||||
Epoch(EpochProcessorBuilder),
|
||||
Date(DateProcessorBuilder),
|
||||
JsonPath(JsonPathProcessorBuilder),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -266,6 +272,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders> {
|
||||
urlencoding::PROCESSOR_URL_ENCODING => {
|
||||
ProcessorBuilders::UrlEncoding(UrlEncodingProcessorBuilder::try_from(value)?)
|
||||
}
|
||||
json_path::PROCESSOR_JSON_PATH => {
|
||||
ProcessorBuilders::JsonPath(json_path::JsonPathProcessorBuilder::try_from(value)?)
|
||||
}
|
||||
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
|
||||
};
|
||||
|
||||
|
||||
231
src/pipeline/src/etl/processor/json_path.rs
Normal file
231
src/pipeline/src/etl/processor/json_path.rs
Normal file
@@ -0,0 +1,231 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use ahash::HashSet;
|
||||
use jsonpath_rust::JsonPath;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use super::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder,
|
||||
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
|
||||
};
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::etl::field::{Fields, OneInputOneOutputField};
|
||||
use crate::etl::processor::ProcessorKind;
|
||||
use crate::etl_error::{
|
||||
JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
|
||||
ProcessorMissingFieldSnafu,
|
||||
};
|
||||
use crate::Value;
|
||||
|
||||
pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct JsonPathProcessorBuilder {
|
||||
fields: Fields,
|
||||
json_path: JsonPath<Value>,
|
||||
ignore_missing: bool,
|
||||
result_idex: Option<usize>,
|
||||
}
|
||||
|
||||
impl JsonPathProcessorBuilder {
|
||||
fn build(self, intermediate_keys: &[String]) -> Result<JsonPathProcessor> {
|
||||
let mut real_fields = vec![];
|
||||
for field in self.fields.into_iter() {
|
||||
let input = OneInputOneOutputField::build(
|
||||
JSON_PATH_NAME,
|
||||
intermediate_keys,
|
||||
field.input_field(),
|
||||
field.target_or_input_field(),
|
||||
)?;
|
||||
real_fields.push(input);
|
||||
}
|
||||
|
||||
Ok(JsonPathProcessor {
|
||||
fields: real_fields,
|
||||
json_path: self.json_path,
|
||||
ignore_missing: self.ignore_missing,
|
||||
result_idex: self.result_idex,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl ProcessorBuilder for JsonPathProcessorBuilder {
|
||||
fn output_keys(&self) -> HashSet<&str> {
|
||||
self.fields
|
||||
.iter()
|
||||
.map(|f| f.target_or_input_field())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn input_keys(&self) -> HashSet<&str> {
|
||||
self.fields.iter().map(|f| f.input_field()).collect()
|
||||
}
|
||||
|
||||
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
|
||||
self.build(intermediate_keys).map(ProcessorKind::JsonPath)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessorBuilder {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
|
||||
let mut fields = Fields::default();
|
||||
let mut ignore_missing = false;
|
||||
let mut json_path = None;
|
||||
let mut result_idex = None;
|
||||
|
||||
for (k, v) in value.iter() {
|
||||
let key = k
|
||||
.as_str()
|
||||
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
|
||||
match key {
|
||||
FIELD_NAME => {
|
||||
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
|
||||
}
|
||||
FIELDS_NAME => {
|
||||
fields = yaml_new_fields(v, FIELDS_NAME)?;
|
||||
}
|
||||
|
||||
IGNORE_MISSING_NAME => {
|
||||
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
|
||||
}
|
||||
JSON_PATH_RESULT_INDEX_NAME => {
|
||||
result_idex = Some(v.as_i64().context(JsonPathParseResultIndexSnafu)? as usize);
|
||||
}
|
||||
|
||||
JSON_PATH_NAME => {
|
||||
let json_path_str = yaml_string(v, JSON_PATH_NAME)?;
|
||||
json_path = Some(
|
||||
JsonPath::try_from(json_path_str.as_str()).context(JsonPathParseSnafu)?,
|
||||
);
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
if let Some(json_path) = json_path {
|
||||
let processor = JsonPathProcessorBuilder {
|
||||
fields,
|
||||
json_path,
|
||||
ignore_missing,
|
||||
result_idex,
|
||||
};
|
||||
|
||||
Ok(processor)
|
||||
} else {
|
||||
ProcessorMissingFieldSnafu {
|
||||
processor: PROCESSOR_JSON_PATH,
|
||||
field: JSON_PATH_NAME,
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct JsonPathProcessor {
|
||||
fields: Vec<OneInputOneOutputField>,
|
||||
json_path: JsonPath<Value>,
|
||||
ignore_missing: bool,
|
||||
result_idex: Option<usize>,
|
||||
}
|
||||
|
||||
impl Default for JsonPathProcessor {
|
||||
fn default() -> Self {
|
||||
JsonPathProcessor {
|
||||
fields: vec![],
|
||||
json_path: JsonPath::try_from("$").unwrap(),
|
||||
ignore_missing: false,
|
||||
result_idex: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl JsonPathProcessor {
|
||||
fn process_field(&self, val: &Value) -> Result<Value> {
|
||||
let processed = self.json_path.find(val);
|
||||
match processed {
|
||||
Value::Array(arr) => {
|
||||
if let Some(index) = self.result_idex {
|
||||
Ok(arr.get(index).cloned().unwrap_or(Value::Null))
|
||||
} else {
|
||||
Ok(Value::Array(arr))
|
||||
}
|
||||
}
|
||||
v => Ok(v),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Processor for JsonPathProcessor {
|
||||
fn kind(&self) -> &str {
|
||||
PROCESSOR_JSON_PATH
|
||||
}
|
||||
|
||||
fn ignore_missing(&self) -> bool {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_index();
|
||||
match val.get(index) {
|
||||
Some(v) => {
|
||||
let processed = self.process_field(v)?;
|
||||
|
||||
let output_index = field.output_index();
|
||||
val[output_index] = processed;
|
||||
}
|
||||
None => {
|
||||
if !self.ignore_missing {
|
||||
return ProcessorMissingFieldSnafu {
|
||||
processor: self.kind(),
|
||||
field: field.input_name(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::Map;
|
||||
|
||||
#[test]
|
||||
fn test_json_path() {
|
||||
use super::*;
|
||||
use crate::Value;
|
||||
|
||||
let json_path = JsonPath::try_from("$.hello").unwrap();
|
||||
let processor = JsonPathProcessor {
|
||||
json_path,
|
||||
result_idex: Some(0),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let result = processor
|
||||
.process_field(&Value::Map(Map::one(
|
||||
"hello",
|
||||
Value::String("world".to_string()),
|
||||
)))
|
||||
.unwrap();
|
||||
assert_eq!(result, Value::String("world".to_string()));
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,10 @@ use std::collections::BTreeMap;
|
||||
|
||||
pub use array::Array;
|
||||
use jsonb::{Number as JsonbNumber, Object as JsonbObject, Value as JsonbValue};
|
||||
use jsonpath_rust::path::{JsonLike, Path};
|
||||
use jsonpath_rust::{jsp_idx, jsp_obj};
|
||||
pub use map::Map;
|
||||
use regex::Regex;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
pub use time::Timestamp;
|
||||
|
||||
@@ -35,10 +38,11 @@ use crate::etl::error::{Error, Result};
|
||||
/// acts as value: the enclosed value is the actual value
|
||||
/// acts as type: the enclosed value is the default value
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(Debug, Clone, PartialEq, Default)]
|
||||
pub enum Value {
|
||||
// as value: null
|
||||
// as type: no type specified
|
||||
#[default]
|
||||
Null,
|
||||
|
||||
Int8(i8),
|
||||
@@ -230,6 +234,36 @@ impl Value {
|
||||
Value::Null => "null",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &str) -> Option<&Self> {
|
||||
match self {
|
||||
Value::Map(map) => map.get(key),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_str(&self) -> Option<&str> {
|
||||
match self {
|
||||
Value::String(v) => Some(v),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_f64(&self) -> Option<f64> {
|
||||
match self {
|
||||
Value::Float32(v) => Some(*v as f64),
|
||||
Value::Float64(v) => Some(*v),
|
||||
Value::Uint64(v) => Some(*v as f64),
|
||||
Value::Uint32(v) => Some(*v as f64),
|
||||
Value::Uint16(v) => Some(*v as f64),
|
||||
Value::Uint8(v) => Some(*v as f64),
|
||||
Value::Int64(v) => Some(*v as f64),
|
||||
Value::Int32(v) => Some(*v as f64),
|
||||
Value::Int16(v) => Some(*v as f64),
|
||||
Value::Int8(v) => Some(*v as f64),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Value {
|
||||
@@ -410,3 +444,352 @@ impl From<Value> for JsonbValue<'_> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for Value {
|
||||
fn from(value: String) -> Self {
|
||||
Value::String(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for Value {
|
||||
fn from(value: &str) -> Self {
|
||||
Value::String(value.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<i64> for Value {
|
||||
fn from(value: i64) -> Self {
|
||||
Value::Int64(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<f64> for Value {
|
||||
fn from(value: f64) -> Self {
|
||||
Value::Float64(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<String>> for Value {
|
||||
fn from(value: Vec<String>) -> Self {
|
||||
Value::Array(Array {
|
||||
values: value.into_iter().map(Value::String).collect(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<Self>> for Value {
|
||||
fn from(value: Vec<Self>) -> Self {
|
||||
Value::Array(Array { values: value })
|
||||
}
|
||||
}
|
||||
|
||||
impl From<bool> for Value {
|
||||
fn from(value: bool) -> Self {
|
||||
Value::Boolean(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl JsonLike for Value {
|
||||
fn get(&self, key: &str) -> Option<&Self> {
|
||||
self.get(key)
|
||||
}
|
||||
|
||||
fn itre(&self, pref: String) -> Vec<jsonpath_rust::JsonPathValue<Self>> {
|
||||
let res = match self {
|
||||
Value::Array(elems) => {
|
||||
let mut res = vec![];
|
||||
for (idx, el) in elems.iter().enumerate() {
|
||||
res.push(jsonpath_rust::JsonPathValue::Slice(
|
||||
el,
|
||||
jsonpath_rust::jsp_idx(&pref, idx),
|
||||
));
|
||||
}
|
||||
res
|
||||
}
|
||||
Value::Map(elems) => {
|
||||
let mut res = vec![];
|
||||
for (key, el) in elems.iter() {
|
||||
res.push(jsonpath_rust::JsonPathValue::Slice(
|
||||
el,
|
||||
jsonpath_rust::jsp_obj(&pref, key),
|
||||
));
|
||||
}
|
||||
res
|
||||
}
|
||||
_ => vec![],
|
||||
};
|
||||
if res.is_empty() {
|
||||
vec![jsonpath_rust::JsonPathValue::NoValue]
|
||||
} else {
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
fn array_len(&self) -> jsonpath_rust::JsonPathValue<'static, Self> {
|
||||
match self {
|
||||
Value::Array(elems) => {
|
||||
jsonpath_rust::JsonPathValue::NewValue(Value::Int64(elems.len() as i64))
|
||||
}
|
||||
_ => jsonpath_rust::JsonPathValue::NoValue,
|
||||
}
|
||||
}
|
||||
|
||||
fn init_with_usize(cnt: usize) -> Self {
|
||||
Value::Int64(cnt as i64)
|
||||
}
|
||||
|
||||
fn deep_flatten(&self, pref: String) -> Vec<(&Self, String)> {
|
||||
let mut acc = vec![];
|
||||
match self {
|
||||
Value::Map(elems) => {
|
||||
for (f, v) in elems.iter() {
|
||||
let pref = jsp_obj(&pref, f);
|
||||
acc.push((v, pref.clone()));
|
||||
acc.append(&mut v.deep_flatten(pref));
|
||||
}
|
||||
}
|
||||
Value::Array(elems) => {
|
||||
for (i, v) in elems.iter().enumerate() {
|
||||
let pref = jsp_idx(&pref, i);
|
||||
acc.push((v, pref.clone()));
|
||||
acc.append(&mut v.deep_flatten(pref));
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
acc
|
||||
}
|
||||
|
||||
fn deep_path_by_key<'a>(
|
||||
&'a self,
|
||||
key: jsonpath_rust::path::ObjectField<'a, Self>,
|
||||
pref: String,
|
||||
) -> Vec<(&'a Self, String)> {
|
||||
let mut result: Vec<(&'a Value, String)> = jsonpath_rust::JsonPathValue::vec_as_pair(
|
||||
key.find(jsonpath_rust::JsonPathValue::new_slice(self, pref.clone())),
|
||||
);
|
||||
match self {
|
||||
Value::Map(elems) => {
|
||||
let mut next_levels: Vec<(&'a Value, String)> = elems
|
||||
.iter()
|
||||
.flat_map(|(k, v)| v.deep_path_by_key(key.clone(), jsp_obj(&pref, k)))
|
||||
.collect();
|
||||
result.append(&mut next_levels);
|
||||
result
|
||||
}
|
||||
Value::Array(elems) => {
|
||||
let mut next_levels: Vec<(&'a Value, String)> = elems
|
||||
.iter()
|
||||
.enumerate()
|
||||
.flat_map(|(i, v)| v.deep_path_by_key(key.clone(), jsp_idx(&pref, i)))
|
||||
.collect();
|
||||
result.append(&mut next_levels);
|
||||
result
|
||||
}
|
||||
_ => result,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> Option<u64> {
|
||||
match self {
|
||||
Value::Uint64(v) => Some(*v),
|
||||
Value::Uint32(v) => Some(*v as u64),
|
||||
Value::Uint16(v) => Some(*v as u64),
|
||||
Value::Uint8(v) => Some(*v as u64),
|
||||
Value::Int64(v) if *v >= 0 => Some(*v as u64),
|
||||
Value::Int32(v) if *v >= 0 => Some(*v as u64),
|
||||
Value::Int16(v) if *v >= 0 => Some(*v as u64),
|
||||
Value::Int8(v) if *v >= 0 => Some(*v as u64),
|
||||
Value::Float64(v) if *v >= 0.0 => Some(*v as u64),
|
||||
Value::Float32(v) if *v >= 0.0 => Some(*v as u64),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_array(&self) -> bool {
|
||||
matches!(self, Value::Array(_))
|
||||
}
|
||||
|
||||
fn as_array(&self) -> Option<&Vec<Self>> {
|
||||
match self {
|
||||
Value::Array(arr) => Some(&arr.values),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn size(left: Vec<&Self>, right: Vec<&Self>) -> bool {
|
||||
if let Some(v) = right.first() {
|
||||
let sz = match v {
|
||||
Value::Int64(n) => *n as usize,
|
||||
Value::Int32(n) => *n as usize,
|
||||
Value::Int16(n) => *n as usize,
|
||||
Value::Int8(n) => *n as usize,
|
||||
|
||||
Value::Uint64(n) => *n as usize,
|
||||
Value::Uint32(n) => *n as usize,
|
||||
Value::Uint16(n) => *n as usize,
|
||||
Value::Uint8(n) => *n as usize,
|
||||
Value::Float32(n) => *n as usize,
|
||||
Value::Float64(n) => *n as usize,
|
||||
_ => return false,
|
||||
};
|
||||
for el in left.iter() {
|
||||
match el {
|
||||
Value::String(v) if v.len() == sz => true,
|
||||
Value::Array(elems) if elems.len() == sz => true,
|
||||
Value::Map(fields) if fields.len() == sz => true,
|
||||
_ => return false,
|
||||
};
|
||||
}
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn sub_set_of(left: Vec<&Self>, right: Vec<&Self>) -> bool {
|
||||
if left.is_empty() {
|
||||
return true;
|
||||
}
|
||||
if right.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(elems) = left.first().and_then(|e| e.as_array()) {
|
||||
if let Some(Value::Array(right_elems)) = right.first() {
|
||||
if right_elems.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
for el in elems {
|
||||
let mut res = false;
|
||||
|
||||
for r in right_elems.iter() {
|
||||
if el.eq(r) {
|
||||
res = true
|
||||
}
|
||||
}
|
||||
if !res {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn any_of(left: Vec<&Self>, right: Vec<&Self>) -> bool {
|
||||
if left.is_empty() {
|
||||
return true;
|
||||
}
|
||||
if right.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(Value::Array(elems)) = right.first() {
|
||||
if elems.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
for el in left.iter() {
|
||||
if let Some(left_elems) = el.as_array() {
|
||||
for l in left_elems.iter() {
|
||||
for r in elems.iter() {
|
||||
if l.eq(r) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for r in elems.iter() {
|
||||
if el.eq(&r) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
fn regex(left: Vec<&Self>, right: Vec<&Self>) -> bool {
|
||||
if left.is_empty() || right.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
match right.first() {
|
||||
Some(Value::String(str)) => {
|
||||
if let Ok(regex) = Regex::new(str) {
|
||||
for el in left.iter() {
|
||||
if let Some(v) = el.as_str() {
|
||||
if regex.is_match(v) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn inside(left: Vec<&Self>, right: Vec<&Self>) -> bool {
|
||||
if left.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
match right.first() {
|
||||
Some(Value::Array(elems)) => {
|
||||
for el in left.iter() {
|
||||
if elems.contains(el) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
Some(Value::Map(elems)) => {
|
||||
for el in left.iter() {
|
||||
for r in elems.values() {
|
||||
if el.eq(&r) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn less(left: Vec<&Self>, right: Vec<&Self>) -> bool {
|
||||
if left.len() == 1 && right.len() == 1 {
|
||||
match (left.first(), right.first()) {
|
||||
(Some(l), Some(r)) => l
|
||||
.as_f64()
|
||||
.and_then(|v1| r.as_f64().map(|v2| v1 < v2))
|
||||
.unwrap_or(false),
|
||||
_ => false,
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn eq(left: Vec<&Self>, right: Vec<&Self>) -> bool {
|
||||
if left.len() != right.len() {
|
||||
false
|
||||
} else {
|
||||
left.iter().zip(right).map(|(a, b)| a.eq(&b)).all(|a| a)
|
||||
}
|
||||
}
|
||||
|
||||
fn array(data: Vec<Self>) -> Self {
|
||||
Value::Array(Array { values: data })
|
||||
}
|
||||
|
||||
fn null() -> Self {
|
||||
Value::Null
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@ use api::v1::value::ValueData;
|
||||
use api::v1::Rows;
|
||||
use common_telemetry::tracing::info;
|
||||
use greptime_proto::v1::value::ValueData::{
|
||||
BoolValue, F64Value, StringValue, TimestampNanosecondValue, TimestampSecondValue, U32Value,
|
||||
U64Value, U8Value,
|
||||
BinaryValue, BoolValue, F64Value, StringValue, TimestampNanosecondValue, TimestampSecondValue,
|
||||
U32Value, U64Value, U8Value,
|
||||
};
|
||||
use greptime_proto::v1::Value as GreptimeValue;
|
||||
use pipeline::{parse, Content, GreptimeTransformer, Pipeline};
|
||||
@@ -518,6 +518,112 @@ transform:
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_path() {
|
||||
let input_value_str = r#"
|
||||
{
|
||||
"product_object": {
|
||||
"hello": "world"
|
||||
},
|
||||
"product_array": [
|
||||
"hello",
|
||||
"world"
|
||||
],
|
||||
"complex_object": {
|
||||
"shop": {
|
||||
"orders": [
|
||||
{
|
||||
"id": 1,
|
||||
"active": true
|
||||
},
|
||||
{
|
||||
"id": 2
|
||||
},
|
||||
{
|
||||
"id": 3
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"active": true
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}"#;
|
||||
let input_value = serde_json::from_str::<serde_json::Value>(input_value_str).unwrap();
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
processors:
|
||||
- json_path:
|
||||
fields:
|
||||
- product_object, object_target
|
||||
json_path: "$.hello"
|
||||
result_index: 0
|
||||
- json_path:
|
||||
fields:
|
||||
- product_array, array_target
|
||||
json_path: "$.[1]"
|
||||
result_index: 0
|
||||
- json_path:
|
||||
fields:
|
||||
- complex_object, complex_target1
|
||||
json_path: "$.shop.orders[?(@.active)].id"
|
||||
- json_path:
|
||||
fields:
|
||||
- complex_target1, complex_target_2
|
||||
json_path: "$.[1]"
|
||||
result_index: 0
|
||||
- json_path:
|
||||
fields:
|
||||
- complex_object, complex_target_3
|
||||
json_path: "$.shop.orders[?(@.active)].id"
|
||||
result_index: 1
|
||||
transform:
|
||||
- fields:
|
||||
- object_target
|
||||
- array_target
|
||||
type: string
|
||||
- fields:
|
||||
- complex_target_3
|
||||
- complex_target_2
|
||||
type: uint32
|
||||
- fields:
|
||||
- complex_target1
|
||||
type: json
|
||||
"#;
|
||||
|
||||
let yaml_content = Content::Yaml(pipeline_yaml.into());
|
||||
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = pipeline.init_intermediate_state();
|
||||
|
||||
pipeline.prepare(input_value, &mut status).unwrap();
|
||||
let row = pipeline.exec_mut(&mut status).unwrap();
|
||||
|
||||
let r = row
|
||||
.values
|
||||
.into_iter()
|
||||
.map(|v| v.value_data.unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let object_target = r[0].clone();
|
||||
let array_target = r[1].clone();
|
||||
let complex_target3 = r[2].clone();
|
||||
let complex_target2 = r[3].clone();
|
||||
let complex_target1 = r[4].clone();
|
||||
|
||||
assert_eq!(StringValue("world".into()), object_target);
|
||||
assert_eq!(StringValue("world".into()), array_target);
|
||||
assert_eq!(complex_target3, complex_target2);
|
||||
|
||||
assert_eq!(
|
||||
BinaryValue(
|
||||
jsonb::Value::Array(vec![jsonb::Value::from(1), jsonb::Value::from(4),]).to_vec()
|
||||
),
|
||||
complex_target1
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_data() {
|
||||
let input_value_str = r#"
|
||||
|
||||
Reference in New Issue
Block a user