feat!: improve greptime_identity pipeline behavior (#6932)

* flat by default, store array in string

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* expose max_nested_levels param, store string instead of error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove flatten option

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-09-25 08:28:28 -07:00
committed by GitHub
parent 280df064c7
commit 0790835c77
3 changed files with 96 additions and 77 deletions

View File

@@ -584,12 +584,6 @@ pub enum Error {
TableSuffixRequiredForDispatcherRule,
#[snafu(display("Value is required for dispatcher rule"))]
ValueRequiredForDispatcherRule,
#[snafu(display("Reached max nested levels when flattening JSON object: {max_nested_levels}"))]
ReachedMaxNestedLevels {
max_nested_levels: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Pipeline table not found"))]
PipelineTableNotFound {
@@ -887,7 +881,6 @@ impl ErrorExt for Error {
| FieldRequiredForDispatcher
| TableSuffixRequiredForDispatcherRule
| ValueRequiredForDispatcherRule
| ReachedMaxNestedLevels { .. }
| RequiredTableSuffixTemplate
| InvalidTableSuffixTemplate { .. }
| CompileVrl { .. }

View File

@@ -30,14 +30,15 @@ use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use jsonb::Number;
use once_cell::sync::OnceCell;
use serde_json as serde_json_crate;
use session::context::Channel;
use snafu::OptionExt;
use vrl::prelude::VrlValueConvert;
use vrl::prelude::{Bytes, VrlValueConvert};
use vrl::value::{KeyString, Value as VrlValue};
use crate::error::{
IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, ReachedMaxNestedLevelsSnafu,
Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
};
use crate::etl::PipelineDocVersion;
@@ -65,23 +66,24 @@ pub struct GreptimePipelineParams {
/// This should not be used directly, instead, use the parsed shortcut option values.
options: HashMap<String, String>,
/// Parsed shortcut option values
pub flatten_json_object: OnceCell<bool>,
/// Whether to skip error when processing the pipeline.
pub skip_error: OnceCell<bool>,
/// Max nested levels when flattening JSON object. Defaults to
/// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided.
pub max_nested_levels: OnceCell<usize>,
}
impl GreptimePipelineParams {
/// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
/// The params is in the format of `key1=value1&key2=value2`,for example:
/// x-greptime-pipeline-params: flatten_json_object=true
/// x-greptime-pipeline-params: max_nested_levels=5
pub fn from_params(params: Option<&str>) -> Self {
let options = Self::parse_header_str_to_map(params);
Self {
options,
skip_error: OnceCell::new(),
flatten_json_object: OnceCell::new(),
max_nested_levels: OnceCell::new(),
}
}
@@ -89,7 +91,7 @@ impl GreptimePipelineParams {
Self {
options,
skip_error: OnceCell::new(),
flatten_json_object: OnceCell::new(),
max_nested_levels: OnceCell::new(),
}
}
@@ -109,22 +111,24 @@ impl GreptimePipelineParams {
}
}
/// Whether to flatten the JSON object.
pub fn flatten_json_object(&self) -> bool {
*self.flatten_json_object.get_or_init(|| {
self.options
.get("flatten_json_object")
.map(|v| v == "true")
.unwrap_or(false)
})
}
/// Whether to skip error when processing the pipeline.
pub fn skip_error(&self) -> bool {
*self
.skip_error
.get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
}
/// Max nested levels for JSON flattening. If not provided or invalid,
/// falls back to `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING`.
pub fn max_nested_levels(&self) -> usize {
*self.max_nested_levels.get_or_init(|| {
self.options
.get("max_nested_levels")
.and_then(|s| s.parse::<usize>().ok())
.filter(|v| *v > 0)
.unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
})
}
}
impl GreptimeTransformer {
@@ -618,19 +622,14 @@ pub fn identity_pipeline(
pipeline_ctx: &PipelineContext<'_>,
) -> Result<HashMap<ContextOpt, Rows>> {
let skip_error = pipeline_ctx.pipeline_param.skip_error();
let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
let mut results = Vec::with_capacity(array.len());
for item in array.into_iter() {
let result = unwrap_or_continue_if_err!(
flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING),
skip_error
);
results.push(result);
}
results
} else {
array
};
let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
// Always flatten JSON objects and stringify arrays
let mut input = Vec::with_capacity(array.len());
for item in array.into_iter() {
let result =
unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
input.push(result);
}
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
if let Some(table) = table {
@@ -659,32 +658,52 @@ pub fn identity_pipeline(
/// Consumes the JSON object and consumes it into a single-level object.
///
/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
/// The `max_nested_levels` parameter is used to limit how deep to flatten nested JSON objects.
/// When the maximum level is reached, the remaining nested structure is serialized to a JSON
/// string and stored at the current flattened key.
pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
let mut flattened = BTreeMap::new();
let object = object.into_object().context(ValueMustBeMapSnafu)?;
if !object.is_empty() {
// it will use recursion to flatten the object.
do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
}
Ok(VrlValue::Object(flattened))
}
fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
match value {
VrlValue::Null => serde_json_crate::Value::Null,
VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
.map(serde_json_crate::Value::Number)
.unwrap_or(serde_json_crate::Value::Null),
VrlValue::Bytes(bytes) => {
serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
}
VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
VrlValue::Array(arr) => {
serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
}
VrlValue::Object(map) => serde_json_crate::Value::Object(
map.iter()
.map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
.collect(),
),
}
}
fn do_flatten_object(
dest: &mut BTreeMap<KeyString, VrlValue>,
base: Option<&str>,
object: BTreeMap<KeyString, VrlValue>,
current_level: usize,
max_nested_levels: usize,
) -> Result<()> {
// For safety, we do not allow the depth to be greater than the max_object_depth.
if current_level > max_nested_levels {
return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
}
) {
for (key, value) in object {
let new_key = base.map_or_else(
|| key.clone(),
@@ -693,22 +712,35 @@ fn do_flatten_object(
match value {
VrlValue::Object(object) => {
do_flatten_object(
dest,
Some(&new_key),
object,
current_level + 1,
max_nested_levels,
)?;
if current_level >= max_nested_levels {
// Reached the maximum level; stringify the remaining object.
let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
&VrlValue::Object(object),
))
.unwrap_or_else(|_| String::from("{}"));
dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
} else {
do_flatten_object(
dest,
Some(&new_key),
object,
current_level + 1,
max_nested_levels,
);
}
}
// For other types, we will directly insert them into as JSON type.
// Arrays are stringified to ensure no JSON column types in the result.
VrlValue::Array(_) => {
let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
.unwrap_or_else(|_| String::from("[]"));
dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
}
// Other leaf types are inserted as-is.
_ => {
dest.insert(new_key, value);
}
}
}
Ok(())
}
#[cfg(test)]
@@ -920,9 +952,9 @@ mod tests {
10,
Some(serde_json::json!(
{
"a.b.c": [1,2,3],
"d": ["foo","bar"],
"e.f": [7,8,9],
"a.b.c": "[1,2,3]",
"d": "[\"foo\",\"bar\"]",
"e.f": "[7,8,9]",
"e.g.h": 123,
"e.g.i": "hello",
"e.g.j.k": true
@@ -947,7 +979,12 @@ mod tests {
}
),
3,
None,
Some(serde_json::json!(
{
"a.b.c": "{\"d\":[1,2,3]}",
"e": "[\"foo\",\"bar\"]"
}
)),
),
];
@@ -959,15 +996,4 @@ mod tests {
assert_eq!(flattened_object, expected);
}
}
#[test]
fn test_greptime_pipeline_params() {
let params = Some("flatten_json_object=true");
let pipeline_params = GreptimePipelineParams::from_params(params);
assert!(pipeline_params.flatten_json_object());
let params = None;
let pipeline_params = GreptimePipelineParams::from_params(params);
assert!(!pipeline_params.flatten_json_object());
}
}

View File

@@ -2333,8 +2333,8 @@ pub async fn test_identity_pipeline(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let line1_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***",[1,2,3],{"a":1,"b":2},"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java",null,null]"#;
let line2_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***",[1,2,3],{"a":1,"b":2},"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei"]"#;
let line1_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","[1,2,3]",1,2,"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java",null,null]"#;
let line2_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","[1,2,3]",1,2,"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei"]"#;
let res = client.get("/v1/sql?sql=select * from logs").send().await;
assert_eq!(res.status(), StatusCode::OK);
let resp: serde_json::Value = res.json().await;
@@ -2357,7 +2357,7 @@ pub async fn test_identity_pipeline(store_type: StorageType) {
serde_json::from_str::<Vec<Value>>(line2_expected).unwrap()
);
let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["json_array","Json","","YES","","FIELD"],["json_object","Json","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"]]"#;
let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["json_array","String","","YES","","FIELD"],["json_object.a","Int64","","YES","","FIELD"],["json_object.b","Int64","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"]]"#;
validate_data("identity_schema", &client, "desc logs", expected).await;
guard.remove_all().await;
@@ -3352,7 +3352,7 @@ pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["custom_map.value_a","Json","","YES","","FIELD"],["custom_map.value_b","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"]]"#;
let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["custom_map.value_a","String","","YES","","FIELD"],["custom_map.value_b","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"]]"#;
validate_data(
"test_identity_pipeline_with_flatten_desc_logs",
&client,
@@ -3361,7 +3361,7 @@ pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) {
)
.await;
let expected = "[[[\"a\",\"b\",\"c\"]]]";
let expected = "[[\"[\\\"a\\\",\\\"b\\\",\\\"c\\\"]\"]]";
validate_data(
"test_identity_pipeline_with_flatten_select_json",
&client,