From d7ed6a69aba2fe8e7f0d235ff22355073f355fa2 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Mon, 27 Oct 2025 11:30:52 +0800 Subject: [PATCH] feat: merge json datatype (#7142) * feat: merge json datatype Signed-off-by: luofucong * resolve PR comments Signed-off-by: luofucong --------- Signed-off-by: luofucong --- src/datatypes/src/data_type.rs | 4 +- src/datatypes/src/error.rs | 10 +- src/datatypes/src/json.rs | 6 +- src/datatypes/src/types/json_type.rs | 191 ++++++++++++++++++++++++- src/datatypes/src/types/struct_type.rs | 2 +- 5 files changed, 204 insertions(+), 9 deletions(-) diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index eb47d30305..bb84e5a30b 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -348,9 +348,9 @@ impl ConcreteDataType { } } - pub fn as_json(&self) -> Option { + pub fn as_json(&self) -> Option<&JsonType> { match self { - ConcreteDataType::Json(j) => Some(j.clone()), + ConcreteDataType::Json(j) => Some(j), _ => None, } } diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 85e78ce1eb..064c78e89d 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -259,6 +259,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to merge JSON datatype: {reason}"))] + MergeJsonDatatype { + reason: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -281,7 +288,8 @@ impl ErrorExt for Error { | InvalidJsonb { .. } | InvalidVector { .. } | InvalidFulltextOption { .. } - | InvalidSkippingIndexOption { .. } => StatusCode::InvalidArguments, + | InvalidSkippingIndexOption { .. } + | MergeJsonDatatype { .. } => StatusCode::InvalidArguments, ValueExceedsPrecision { .. } | CastType { .. } diff --git a/src/datatypes/src/json.rs b/src/datatypes/src/json.rs index 902b84a131..64952bb39a 100644 --- a/src/datatypes/src/json.rs +++ b/src/datatypes/src/json.rs @@ -30,7 +30,7 @@ use snafu::{ResultExt, ensure}; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Error}; -use crate::types::{StructField, StructType}; +use crate::types::{ListType, StructField, StructType}; use crate::value::{ListValue, StructValue, Value}; /// The configuration of JSON encoding @@ -375,8 +375,8 @@ fn encode_json_value_with_context<'a>( } Json::Array(arr) => { let list_value = encode_json_array_with_context(arr, expected_type, context)?; - let data_type = list_value.datatype().clone(); - Ok((Value::List(list_value), (*data_type).clone())) + let datatype = ConcreteDataType::List(ListType::new(list_value.datatype())); + Ok((Value::List(list_value), datatype)) } Json::Object(obj) => { let struct_value = encode_json_object_with_context(obj, None, context)?; diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index 99dcf9c571..141db03728 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::str::FromStr; +use std::sync::Arc; use arrow::datatypes::DataType as ArrowDataType; use arrow_schema::Fields; @@ -21,10 +23,13 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::data_type::DataType; -use crate::error::{DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, Result}; +use crate::error::{ + DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, MergeJsonDatatypeSnafu, Result, +}; use crate::prelude::ConcreteDataType; use crate::scalars::ScalarVectorBuilder; use crate::type_id::LogicalTypeId; +use crate::types::{ListType, StructField, StructType}; use crate::value::Value; use crate::vectors::{BinaryVectorBuilder, MutableVector}; @@ -48,11 +53,101 @@ impl JsonType { pub fn new(format: JsonFormat) -> Self { Self { format } } + + // TODO(LFC): remove "allow unused" + #[allow(unused)] + /// Make json type a struct type, by: + /// - if the json is an object, its entries are mapped to struct fields, obviously; + /// - if not, the json is one of bool, number, string or array, make it a special field called + /// "__plain" in a struct with only that field. + pub(crate) fn as_struct_type(&self) -> StructType { + match &self.format { + JsonFormat::Jsonb => StructType::default(), + JsonFormat::Native(inner) => match inner.as_ref() { + ConcreteDataType::Struct(t) => t.clone(), + x => StructType::new(Arc::new(vec![StructField::new( + "__plain".to_string(), + x.clone(), + true, + )])), + }, + } + } + + // TODO(LFC): remove "allow unused" + #[allow(unused)] + /// Try to merge this json type with others, error on datatype conflict. + pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> { + match (&self.format, &other.format) { + (JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()), + (JsonFormat::Native(this), JsonFormat::Native(that)) => { + let merged = merge(this.as_ref(), that.as_ref())?; + self.format = JsonFormat::Native(Box::new(merged)); + Ok(()) + } + _ => MergeJsonDatatypeSnafu { + reason: "json format not match", + } + .fail(), + } + } +} + +fn merge(this: &ConcreteDataType, that: &ConcreteDataType) -> Result { + match (this, that) { + (this, that) if this == that => Ok(this.clone()), + (ConcreteDataType::List(this), ConcreteDataType::List(that)) => { + merge_list(this, that).map(ConcreteDataType::List) + } + (ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => { + merge_struct(this, that).map(ConcreteDataType::Struct) + } + (ConcreteDataType::Null(_), x) | (x, ConcreteDataType::Null(_)) => Ok(x.clone()), + _ => MergeJsonDatatypeSnafu { + reason: format!("datatypes have conflict, this: {this}, that: {that}"), + } + .fail(), + } +} + +fn merge_list(this: &ListType, that: &ListType) -> Result { + let merged = merge(this.item_type(), that.item_type())?; + Ok(ListType::new(Arc::new(merged))) +} + +fn merge_struct(this: &StructType, that: &StructType) -> Result { + let this = Arc::unwrap_or_clone(this.fields()); + let that = Arc::unwrap_or_clone(that.fields()); + + let mut this: BTreeMap = this + .into_iter() + .map(|x| (x.name().to_string(), x)) + .collect(); + // merge "that" into "this" directly: + for that_field in that { + let field_name = that_field.name().to_string(); + if let Some(this_field) = this.get(&field_name) { + let merged_field = StructField::new( + field_name.clone(), + merge(this_field.data_type(), that_field.data_type())?, + true, // the value in json object must be always nullable + ); + this.insert(field_name, merged_field); + } else { + this.insert(field_name, that_field); + } + } + + let fields = this.into_values().collect::>(); + Ok(StructType::new(Arc::new(fields))) } impl DataType for JsonType { fn name(&self) -> String { - JSON_TYPE_NAME.to_string() + match &self.format { + JsonFormat::Jsonb => JSON_TYPE_NAME.to_string(), + JsonFormat::Native(x) => format!("Json<{x}>"), + } } fn logical_type_id(&self) -> LogicalTypeId { @@ -106,3 +201,95 @@ pub fn parse_string_to_jsonb(s: &str) -> Result> { .map_err(|_| InvalidJsonSnafu { value: s }.build()) .map(|json| json.to_vec()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::json::JsonStructureSettings; + + #[test] + fn test_merge_json_type() -> Result<()> { + fn test( + json: &str, + json_type: &mut JsonType, + expected: std::result::Result<&str, &str>, + ) -> Result<()> { + let json: serde_json::Value = serde_json::from_str(json).unwrap(); + + let settings = JsonStructureSettings::Structured(None); + let value = settings.encode(json)?; + let value_type = value.data_type(); + let Some(other) = value_type.as_json() else { + unreachable!() + }; + + let result = json_type.merge(other); + match (result, expected) { + (Ok(()), Ok(expected)) => { + assert_eq!(json_type.name(), expected) + } + (Err(err), Err(expected)) => { + assert_eq!(err.to_string(), expected) + } + _ => unreachable!(), + } + Ok(()) + } + + let json_type = &mut JsonType::new(JsonFormat::Native(Box::new( + ConcreteDataType::null_datatype(), + ))); + + // can merge with json object: + let json = r#"{ + "hello": "world", + "list": [1, 2, 3], + "object": {"a": 1} + }"#; + let expected = + r#"Json, "object": Struct<"a": Int64>>>"#; + test(json, json_type, Ok(expected))?; + + // cannot merge with other non-object json values: + let jsons = [r#""s""#, "1", "[1]"]; + let expects = [ + r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List, "object": Struct<"a": Int64>>, that: String"#, + r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List, "object": Struct<"a": Int64>>, that: Int64"#, + r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List, "object": Struct<"a": Int64>>, that: List"#, + ]; + for (json, expect) in jsons.into_iter().zip(expects.into_iter()) { + test(json, json_type, Err(expect))?; + } + + // cannot merge with other json object with conflict field datatype: + let json = r#"{ + "hello": 1, + "float": 0.123, + "no": 42 + }"#; + let expected = + r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Int64"#; + test(json, json_type, Err(expected))?; + + // can merge with another json object: + let json = r#"{ + "hello": "greptime", + "float": 0.123, + "int": 42 + }"#; + let expected = r#"Json, "object": Struct<"a": Int64>>>"#; + test(json, json_type, Ok(expected))?; + + // can merge with some complex nested json object: + let json = r#"{ + "list": [4], + "object": {"foo": "bar", "l": ["x"], "o": {"key": "value"}}, + "float": 0.456, + "int": 0 + }"#; + let expected = r#"Json, "object": Struct<"a": Int64, "foo": String, "l": List, "o": Struct<"key": String>>>>"#; + test(json, json_type, Ok(expected))?; + + Ok(()) + } +} diff --git a/src/datatypes/src/types/struct_type.rs b/src/datatypes/src/types/struct_type.rs index 5e3156498f..c082aeb9e6 100644 --- a/src/datatypes/src/types/struct_type.rs +++ b/src/datatypes/src/types/struct_type.rs @@ -52,7 +52,7 @@ impl DataType for StructType { "Struct<{}>", self.fields .iter() - .map(|f| f.name()) + .map(|f| format!(r#""{}": {}"#, f.name(), f.data_type())) .collect::>() .join(", ") )