diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index fa0f9c3b49..033feda0c5 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -25,6 +25,7 @@ use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, Semant use coerce::{coerce_columns, coerce_value}; use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue}; use itertools::Itertools; +use once_cell::sync::OnceCell; use serde_json::Number; use crate::error::{ @@ -54,8 +55,12 @@ pub struct GreptimeTransformer { /// Parameters that can be used to configure the greptime pipelines. #[derive(Debug, Clone, Default)] pub struct GreptimePipelineParams { - /// The options for configuring the greptime pipelines. - pub options: HashMap, + /// The original options for configuring the greptime pipelines. + /// This should not be used directly, instead, use the parsed shortcut option values. + options: HashMap, + + /// Parsed shortcut option values + pub flatten_json_object: OnceCell, } impl GreptimePipelineParams { @@ -70,15 +75,20 @@ impl GreptimePipelineParams { .map(|(k, v)| (k.to_string(), v.to_string())) .collect::>(); - Self { options } + Self { + options, + flatten_json_object: OnceCell::new(), + } } /// Whether to flatten the JSON object. pub fn flatten_json_object(&self) -> bool { - self.options - .get("flatten_json_object") - .map(|v| v == "true") - .unwrap_or(false) + *self.flatten_json_object.get_or_init(|| { + self.options + .get("flatten_json_object") + .map(|v| v == "true") + .unwrap_or(false) + }) } }