diff --git a/Cargo.lock b/Cargo.lock index 8ababdf21d..28f1bac8c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5898,15 +5898,15 @@ dependencies = [ [[package]] name = "jsonpath-rust" -version = "0.7.3" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69a61b87f6a55cc6c28fed5739dd36b9642321ce63e4a5e4a4715d69106f4a10" +checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b" dependencies = [ "pest", "pest_derive", "regex", "serde_json", - "thiserror 1.0.64", + "thiserror 2.0.12", ] [[package]] @@ -8271,7 +8271,7 @@ dependencies = [ "rand", "ring", "rust_decimal", - "thiserror 2.0.6", + "thiserror 2.0.12", "tokio", "tokio-rustls 0.26.0", "tokio-util", @@ -8383,7 +8383,7 @@ dependencies = [ "greptime-proto", "itertools 0.10.5", "jsonb", - "jsonpath-rust 0.7.3", + "jsonpath-rust 0.7.5", "lazy_static", "moka", "once_cell", @@ -11063,7 +11063,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "thiserror 2.0.6", + "thiserror 2.0.12", "tokio", "tokio-stream", "tracing", @@ -11148,7 +11148,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.6", + "thiserror 2.0.12", "tracing", "whoami", ] @@ -11186,7 +11186,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.6", + "thiserror 2.0.12", "tracing", "whoami", ] @@ -11967,11 +11967,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.6" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl 2.0.6", + "thiserror-impl 2.0.12", ] [[package]] @@ -11987,9 +11987,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.6" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index ceb3a992b2..4c2a5e9945 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -41,7 +41,7 @@ futures.workspace = true greptime-proto.workspace = true itertools.workspace = true jsonb.workspace = true -jsonpath-rust = "0.7.3" +jsonpath-rust = "0.7.5" lazy_static.workspace = true moka = { workspace = true, features = ["sync"] } once_cell.workspace = true diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index 124d598d9b..cfe774f8bf 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -16,10 +16,13 @@ pub mod array; pub mod map; pub mod time; +use std::result::Result as StdResult; + pub use array::Array; use jsonb::{Number as JsonbNumber, Object as JsonbObject, Value as JsonbValue}; +use jsonpath_rust::parser::{parse_json_path, JsonPathIndex}; use jsonpath_rust::path::{JsonLike, Path}; -use jsonpath_rust::{jsp_idx, jsp_obj}; +use jsonpath_rust::{jsp_idx, jsp_obj, JsonPath, JsonPathParserError, JsonPathStr}; pub use map::Map; use regex::Regex; use snafu::{OptionExt, ResultExt}; @@ -286,6 +289,52 @@ impl Value { _ => None, } } + + // ref https://github.com/serde-rs/json/blob/master/src/value/mod.rs#L779 + pub fn pointer(&self, pointer: &str) -> Option<&Value> { + if pointer.is_empty() { + return Some(self); + } + if !pointer.starts_with('/') { + return None; + } + pointer + .split('/') + .skip(1) + .map(|x| x.replace("~1", "/").replace("~0", "~")) + .try_fold(self, |target, token| match target { + Value::Map(map) => map.get(&token), + Value::Array(list) => parse_index(&token).and_then(|x| list.get(x)), + _ => None, + }) + } + + // ref https://github.com/serde-rs/json/blob/master/src/value/mod.rs#L834 + pub fn pointer_mut(&mut self, pointer: &str) -> Option<&mut Value> { + if pointer.is_empty() { + return Some(self); + } + if !pointer.starts_with('/') { + return None; + } + pointer + .split('/') + .skip(1) + .map(|x| x.replace("~1", "/").replace("~0", "~")) + .try_fold(self, |target, token| match target { + Value::Map(map) => map.get_mut(&token), + Value::Array(list) => parse_index(&token).and_then(move |x| list.get_mut(x)), + _ => None, + }) + } +} + +// ref https://github.com/serde-rs/json/blob/master/src/value/mod.rs#L259 +fn parse_index(s: &str) -> Option { + if s.starts_with('+') || (s.starts_with('0') && s.len() != 1) { + return None; + } + s.parse().ok() } impl std::fmt::Display for Value { @@ -814,4 +863,46 @@ impl JsonLike for Value { fn null() -> Self { Value::Null } + + // ref https://github.com/besok/jsonpath-rust/blob/main/src/path/mod.rs#L423 + fn reference( + &self, + path: T, + ) -> std::result::Result, JsonPathParserError> + where + T: Into, + { + Ok(self.pointer(&path_to_json_path(path.into())?)) + } + + // https://github.com/besok/jsonpath-rust/blob/main/src/path/mod.rs#L430 + fn reference_mut( + &mut self, + path: T, + ) -> std::result::Result, JsonPathParserError> + where + T: Into, + { + Ok(self.pointer_mut(&path_to_json_path(path.into())?)) + } +} + +// ref https://github.com/besok/jsonpath-rust/blob/main/src/path/mod.rs#L438 +fn path_to_json_path(path: JsonPathStr) -> StdResult { + convert_part(&parse_json_path(path.as_str())?) +} + +// https://github.com/besok/jsonpath-rust/blob/main/src/path/mod.rs#L442 +fn convert_part(path: &JsonPath) -> StdResult { + match path { + JsonPath::Chain(elems) => elems + .iter() + .map(convert_part) + .collect::>(), + + JsonPath::Index(JsonPathIndex::Single(v)) => Ok(format!("/{}", v)), + JsonPath::Field(e) => Ok(format!("/{}", e)), + JsonPath::Root => Ok("".to_string()), + e => Err(JsonPathParserError::InvalidJsonPath(e.to_string())), + } }