From 0790835c773c68c94d257728b91437a3398b372e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 25 Sep 2025 08:28:28 -0700 Subject: [PATCH] feat!: improve `greptime_identity` pipeline behavior (#6932) * flat by default, store array in string Signed-off-by: Ruihang Xia * expose max_nested_levels param, store string instead of error Signed-off-by: Ruihang Xia * remove flatten option Signed-off-by: Ruihang Xia * remove unused errors Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/pipeline/src/error.rs | 7 - .../src/etl/transform/transformer/greptime.rs | 156 ++++++++++-------- tests-integration/tests/http.rs | 10 +- 3 files changed, 96 insertions(+), 77 deletions(-) diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index ca26c9f5d7..73ffb711a1 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -584,12 +584,6 @@ pub enum Error { TableSuffixRequiredForDispatcherRule, #[snafu(display("Value is required for dispatcher rule"))] ValueRequiredForDispatcherRule, - #[snafu(display("Reached max nested levels when flattening JSON object: {max_nested_levels}"))] - ReachedMaxNestedLevels { - max_nested_levels: usize, - #[snafu(implicit)] - location: Location, - }, #[snafu(display("Pipeline table not found"))] PipelineTableNotFound { @@ -887,7 +881,6 @@ impl ErrorExt for Error { | FieldRequiredForDispatcher | TableSuffixRequiredForDispatcherRule | ValueRequiredForDispatcherRule - | ReachedMaxNestedLevels { .. } | RequiredTableSuffixTemplate | InvalidTableSuffixTemplate { .. } | CompileVrl { .. } diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 5673c09d53..de98213972 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -30,14 +30,15 @@ use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue}; use itertools::Itertools; use jsonb::Number; use once_cell::sync::OnceCell; +use serde_json as serde_json_crate; use session::context::Channel; use snafu::OptionExt; -use vrl::prelude::VrlValueConvert; +use vrl::prelude::{Bytes, VrlValueConvert}; use vrl::value::{KeyString, Value as VrlValue}; use crate::error::{ - IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, ReachedMaxNestedLevelsSnafu, - Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu, + IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result, + TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu, }; use crate::etl::PipelineDocVersion; @@ -65,23 +66,24 @@ pub struct GreptimePipelineParams { /// This should not be used directly, instead, use the parsed shortcut option values. options: HashMap, - /// Parsed shortcut option values - pub flatten_json_object: OnceCell, /// Whether to skip error when processing the pipeline. pub skip_error: OnceCell, + /// Max nested levels when flattening JSON object. Defaults to + /// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided. + pub max_nested_levels: OnceCell, } impl GreptimePipelineParams { /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params` /// The params is in the format of `key1=value1&key2=value2`,for example: - /// x-greptime-pipeline-params: flatten_json_object=true + /// x-greptime-pipeline-params: max_nested_levels=5 pub fn from_params(params: Option<&str>) -> Self { let options = Self::parse_header_str_to_map(params); Self { options, skip_error: OnceCell::new(), - flatten_json_object: OnceCell::new(), + max_nested_levels: OnceCell::new(), } } @@ -89,7 +91,7 @@ impl GreptimePipelineParams { Self { options, skip_error: OnceCell::new(), - flatten_json_object: OnceCell::new(), + max_nested_levels: OnceCell::new(), } } @@ -109,22 +111,24 @@ impl GreptimePipelineParams { } } - /// Whether to flatten the JSON object. - pub fn flatten_json_object(&self) -> bool { - *self.flatten_json_object.get_or_init(|| { - self.options - .get("flatten_json_object") - .map(|v| v == "true") - .unwrap_or(false) - }) - } - /// Whether to skip error when processing the pipeline. pub fn skip_error(&self) -> bool { *self .skip_error .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false)) } + + /// Max nested levels for JSON flattening. If not provided or invalid, + /// falls back to `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING`. + pub fn max_nested_levels(&self) -> usize { + *self.max_nested_levels.get_or_init(|| { + self.options + .get("max_nested_levels") + .and_then(|s| s.parse::().ok()) + .filter(|v| *v > 0) + .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING) + }) + } } impl GreptimeTransformer { @@ -618,19 +622,14 @@ pub fn identity_pipeline( pipeline_ctx: &PipelineContext<'_>, ) -> Result> { let skip_error = pipeline_ctx.pipeline_param.skip_error(); - let input = if pipeline_ctx.pipeline_param.flatten_json_object() { - let mut results = Vec::with_capacity(array.len()); - for item in array.into_iter() { - let result = unwrap_or_continue_if_err!( - flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING), - skip_error - ); - results.push(result); - } - results - } else { - array - }; + let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels(); + // Always flatten JSON objects and stringify arrays + let mut input = Vec::with_capacity(array.len()); + for item in array.into_iter() { + let result = + unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error); + input.push(result); + } identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| { if let Some(table) = table { @@ -659,32 +658,52 @@ pub fn identity_pipeline( /// Consumes the JSON object and consumes it into a single-level object. /// -/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object. -/// The error will be returned if the nested levels is greater than the `max_nested_levels`. +/// The `max_nested_levels` parameter is used to limit how deep to flatten nested JSON objects. +/// When the maximum level is reached, the remaining nested structure is serialized to a JSON +/// string and stored at the current flattened key. pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result { let mut flattened = BTreeMap::new(); let object = object.into_object().context(ValueMustBeMapSnafu)?; if !object.is_empty() { // it will use recursion to flatten the object. - do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?; + do_flatten_object(&mut flattened, None, object, 1, max_nested_levels); } Ok(VrlValue::Object(flattened)) } +fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value { + match value { + VrlValue::Null => serde_json_crate::Value::Null, + VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b), + VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()), + VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner()) + .map(serde_json_crate::Value::Number) + .unwrap_or(serde_json_crate::Value::Null), + VrlValue::Bytes(bytes) => { + serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned()) + } + VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()), + VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()), + VrlValue::Array(arr) => { + serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect()) + } + VrlValue::Object(map) => serde_json_crate::Value::Object( + map.iter() + .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v))) + .collect(), + ), + } +} + fn do_flatten_object( dest: &mut BTreeMap, base: Option<&str>, object: BTreeMap, current_level: usize, max_nested_levels: usize, -) -> Result<()> { - // For safety, we do not allow the depth to be greater than the max_object_depth. - if current_level > max_nested_levels { - return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail(); - } - +) { for (key, value) in object { let new_key = base.map_or_else( || key.clone(), @@ -693,22 +712,35 @@ fn do_flatten_object( match value { VrlValue::Object(object) => { - do_flatten_object( - dest, - Some(&new_key), - object, - current_level + 1, - max_nested_levels, - )?; + if current_level >= max_nested_levels { + // Reached the maximum level; stringify the remaining object. + let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json( + &VrlValue::Object(object), + )) + .unwrap_or_else(|_| String::from("{}")); + dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string))); + } else { + do_flatten_object( + dest, + Some(&new_key), + object, + current_level + 1, + max_nested_levels, + ); + } } - // For other types, we will directly insert them into as JSON type. + // Arrays are stringified to ensure no JSON column types in the result. + VrlValue::Array(_) => { + let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value)) + .unwrap_or_else(|_| String::from("[]")); + dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string))); + } + // Other leaf types are inserted as-is. _ => { dest.insert(new_key, value); } } } - - Ok(()) } #[cfg(test)] @@ -920,9 +952,9 @@ mod tests { 10, Some(serde_json::json!( { - "a.b.c": [1,2,3], - "d": ["foo","bar"], - "e.f": [7,8,9], + "a.b.c": "[1,2,3]", + "d": "[\"foo\",\"bar\"]", + "e.f": "[7,8,9]", "e.g.h": 123, "e.g.i": "hello", "e.g.j.k": true @@ -947,7 +979,12 @@ mod tests { } ), 3, - None, + Some(serde_json::json!( + { + "a.b.c": "{\"d\":[1,2,3]}", + "e": "[\"foo\",\"bar\"]" + } + )), ), ]; @@ -959,15 +996,4 @@ mod tests { assert_eq!(flattened_object, expected); } } - - #[test] - fn test_greptime_pipeline_params() { - let params = Some("flatten_json_object=true"); - let pipeline_params = GreptimePipelineParams::from_params(params); - assert!(pipeline_params.flatten_json_object()); - - let params = None; - let pipeline_params = GreptimePipelineParams::from_params(params); - assert!(!pipeline_params.flatten_json_object()); - } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c8e1a00aee..5e4adb024b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -2333,8 +2333,8 @@ pub async fn test_identity_pipeline(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); - let line1_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***",[1,2,3],{"a":1,"b":2},"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java",null,null]"#; - let line2_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***",[1,2,3],{"a":1,"b":2},"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei"]"#; + let line1_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","[1,2,3]",1,2,"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java",null,null]"#; + let line2_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","[1,2,3]",1,2,"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei"]"#; let res = client.get("/v1/sql?sql=select * from logs").send().await; assert_eq!(res.status(), StatusCode::OK); let resp: serde_json::Value = res.json().await; @@ -2357,7 +2357,7 @@ pub async fn test_identity_pipeline(store_type: StorageType) { serde_json::from_str::>(line2_expected).unwrap() ); - let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["json_array","Json","","YES","","FIELD"],["json_object","Json","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"]]"#; + let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["json_array","String","","YES","","FIELD"],["json_object.a","Int64","","YES","","FIELD"],["json_object.b","Int64","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"]]"#; validate_data("identity_schema", &client, "desc logs", expected).await; guard.remove_all().await; @@ -3352,7 +3352,7 @@ pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); - let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["custom_map.value_a","Json","","YES","","FIELD"],["custom_map.value_b","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"]]"#; + let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["custom_map.value_a","String","","YES","","FIELD"],["custom_map.value_b","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"]]"#; validate_data( "test_identity_pipeline_with_flatten_desc_logs", &client, @@ -3361,7 +3361,7 @@ pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) { ) .await; - let expected = "[[[\"a\",\"b\",\"c\"]]]"; + let expected = "[[\"[\\\"a\\\",\\\"b\\\",\\\"c\\\"]\"]]"; validate_data( "test_identity_pipeline_with_flatten_select_json", &client,