refactor: make json value use json type (#7248)

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-11-24 10:40:48 +08:00
committed by GitHub
parent 2f447e6f91
commit ff99bce37c
7 changed files with 347 additions and 310 deletions

View File

@@ -442,7 +442,9 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
JsonFormat::Native(inner) => {
let inner_type = ColumnDataTypeWrapper::try_from(*inner.clone())?;
let inner_type = ColumnDataTypeWrapper::try_from(
ConcreteDataType::from(inner.as_ref()),
)?;
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonNativeType(Box::new(
JsonNativeTypeExtension {
@@ -1710,6 +1712,20 @@ mod tests {
datatype_extension: Some(Box::new(ColumnDataTypeExtension {
type_ext: Some(TypeExt::StructType(StructTypeExtension {
fields: vec![
v1::StructField {
name: "address".to_string(),
datatype: ColumnDataTypeWrapper::string_datatype()
.datatype()
.into(),
datatype_extension: None
},
v1::StructField {
name: "age".to_string(),
datatype: ColumnDataTypeWrapper::int64_datatype()
.datatype()
.into(),
datatype_extension: None
},
v1::StructField {
name: "id".to_string(),
datatype: ColumnDataTypeWrapper::int64_datatype()
@@ -1724,20 +1740,6 @@ mod tests {
.into(),
datatype_extension: None
},
v1::StructField {
name: "age".to_string(),
datatype: ColumnDataTypeWrapper::int32_datatype()
.datatype()
.into(),
datatype_extension: None
},
v1::StructField {
name: "address".to_string(),
datatype: ColumnDataTypeWrapper::string_datatype()
.datatype()
.into(),
datatype_extension: None
}
]
}))
}))

View File

@@ -33,8 +33,8 @@ use crate::types::{
BinaryType, BooleanType, DateType, Decimal128Type, DictionaryType, DurationMicrosecondType,
DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type,
Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, IntervalDayTimeType,
IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonFormat, JsonType, ListType,
NullType, StringType, StructType, TimeMillisecondType, TimeType, TimestampMicrosecondType,
IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonType, ListType, NullType,
StringType, StructType, TimeMillisecondType, TimeType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType,
UInt8Type, UInt16Type, UInt32Type, UInt64Type, VectorType,
};
@@ -677,7 +677,7 @@ impl ConcreteDataType {
}
pub fn json_native_datatype(inner_type: ConcreteDataType) -> ConcreteDataType {
ConcreteDataType::Json(JsonType::new(JsonFormat::Native(Box::new(inner_type))))
ConcreteDataType::Json(JsonType::new_native((&inner_type).into()))
}
}

View File

@@ -28,9 +28,9 @@ use serde::{Deserialize, Serialize};
use serde_json::{Map, Value as Json};
use snafu::{ResultExt, ensure};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Error};
use crate::json::value::{JsonValue, JsonVariant};
use crate::types::json_type::{JsonNativeType, JsonNumberType, JsonObjectType};
use crate::types::{StructField, StructType};
use crate::value::{ListValue, StructValue, Value};
@@ -103,7 +103,7 @@ impl JsonStructureSettings {
pub fn encode_with_type(
&self,
json: Json,
data_type: Option<&ConcreteDataType>,
data_type: Option<&JsonNativeType>,
) -> Result<Value, Error> {
let context = JsonContext {
key_path: String::new(),
@@ -147,7 +147,7 @@ impl<'a> JsonContext<'a> {
/// Main encoding function with key path tracking
pub fn encode_json_with_context<'a>(
json: Json,
data_type: Option<&ConcreteDataType>,
data_type: Option<&JsonNativeType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
// Check if the entire encoding should be unstructured
@@ -163,21 +163,28 @@ pub fn encode_json_with_context<'a>(
match json {
Json::Object(json_object) => {
ensure!(
matches!(data_type, Some(ConcreteDataType::Struct(_)) | None),
error::InvalidJsonSnafu {
value: "JSON object can only be encoded to Struct type".to_string(),
let object_type = match data_type.as_ref() {
Some(JsonNativeType::Object(x)) => Some(x),
None => None,
_ => {
return error::InvalidJsonSnafu {
value: "JSON object value must be encoded with object type",
}
.fail();
}
);
let data_type = data_type.and_then(|x| x.as_struct());
encode_json_object_with_context(json_object, data_type, context)
};
encode_json_object_with_context(json_object, object_type, context)
}
Json::Array(json_array) => {
let item_type = if let Some(ConcreteDataType::List(list_type)) = data_type {
Some(list_type.item_type())
} else {
None
let item_type = match data_type.as_ref() {
Some(JsonNativeType::Array(x)) => Some(x.as_ref()),
None => None,
_ => {
return error::InvalidJsonSnafu {
value: "JSON array value must be encoded with array type",
}
.fail();
}
};
encode_json_array_with_context(json_array, item_type, context)
}
@@ -186,14 +193,13 @@ pub fn encode_json_with_context<'a>(
if let Some(expected_type) = data_type {
let value = encode_json_value_with_context(json, Some(expected_type), context)?;
let actual_type = value.json_type().native_type();
if &actual_type == expected_type {
if actual_type == expected_type {
Ok(value)
} else {
Err(error::InvalidJsonSnafu {
value: format!(
"JSON value type {} does not match expected type {}",
actual_type.name(),
expected_type.name()
actual_type, expected_type
),
}
.build())
@@ -207,23 +213,21 @@ pub fn encode_json_with_context<'a>(
fn encode_json_object_with_context<'a>(
mut json_object: Map<String, Json>,
fields: Option<&StructType>,
fields: Option<&JsonObjectType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
let mut object = BTreeMap::new();
// First, process fields from the provided schema in their original order
if let Some(fields) = fields {
for field in fields.fields().iter() {
let field_name = field.name();
for (field_name, field_type) in fields {
if let Some(value) = json_object.remove(field_name) {
let field_context = context.with_key(field_name);
let value =
encode_json_value_with_context(value, Some(field.data_type()), &field_context)?;
object.insert(field_name.to_string(), value.into_variant());
encode_json_value_with_context(value, Some(field_type), &field_context)?;
object.insert(field_name.clone(), value.into_variant());
} else {
// Field exists in schema but not in JSON - add null value
object.insert(field_name.to_string(), ().into());
object.insert(field_name.clone(), ().into());
}
}
}
@@ -242,7 +246,7 @@ fn encode_json_object_with_context<'a>(
fn encode_json_array_with_context<'a>(
json_array: Vec<Json>,
item_type: Option<&ConcreteDataType>,
item_type: Option<&JsonNativeType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
let json_array_len = json_array.len();
@@ -253,7 +257,7 @@ fn encode_json_array_with_context<'a>(
let array_context = context.with_key(&index.to_string());
let item_value =
encode_json_value_with_context(value, element_type.as_ref(), &array_context)?;
let item_type = item_value.json_type().native_type();
let item_type = item_value.json_type().native_type().clone();
items.push(item_value.into_variant());
// Determine the common type for the list
@@ -280,7 +284,7 @@ fn encode_json_array_with_context<'a>(
/// Helper function to encode a JSON value to a Value and determine its ConcreteDataType with context
fn encode_json_value_with_context<'a>(
json: Json,
expected_type: Option<&ConcreteDataType>,
expected_type: Option<&JsonNativeType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
// Check if current key should be treated as unstructured
@@ -634,7 +638,7 @@ fn decode_unstructured_raw_struct(struct_value: StructValue) -> Result<StructVal
/// Helper function to try converting a value to an expected type
fn try_convert_to_expected_type<T>(
value: T,
expected_type: &ConcreteDataType,
expected_type: &JsonNativeType,
) -> Result<JsonValue, Error>
where
T: Into<JsonValue>,
@@ -647,37 +651,33 @@ where
.fail()
};
let actual_type = value.json_type().native_type();
match (&actual_type, expected_type) {
match (actual_type, expected_type) {
(x, y) if x == y => Ok(value),
(ConcreteDataType::UInt64(_), ConcreteDataType::Int64(_)) => {
if let Some(i) = value.as_i64() {
Ok(i.into())
} else {
cast_error()
(JsonNativeType::Number(x), JsonNativeType::Number(y)) => match (x, y) {
(JsonNumberType::U64, JsonNumberType::I64) => {
if let Some(i) = value.as_i64() {
Ok(i.into())
} else {
cast_error()
}
}
}
(ConcreteDataType::UInt64(_), ConcreteDataType::Float64(_)) => {
if let Some(f) = value.as_f64() {
Ok(f.into())
} else {
cast_error()
(JsonNumberType::I64, JsonNumberType::U64) => {
if let Some(i) = value.as_u64() {
Ok(i.into())
} else {
cast_error()
}
}
}
(ConcreteDataType::Int64(_), ConcreteDataType::UInt64(_)) => {
if let Some(i) = value.as_u64() {
Ok(i.into())
} else {
cast_error()
(_, JsonNumberType::F64) => {
if let Some(f) = value.as_f64() {
Ok(f.into())
} else {
cast_error()
}
}
}
(ConcreteDataType::Int64(_), ConcreteDataType::Float64(_)) => {
if let Some(f) = value.as_f64() {
Ok(f.into())
} else {
cast_error()
}
}
(_, ConcreteDataType::String(_)) => Ok(value.to_string().into()),
_ => cast_error(),
},
(_, JsonNativeType::String) => Ok(value.to_string().into()),
_ => cast_error(),
}
}
@@ -688,6 +688,7 @@ mod tests {
use serde_json::json;
use super::*;
use crate::data_type::ConcreteDataType;
use crate::types::ListType;
#[test]
@@ -884,7 +885,7 @@ mod tests {
let json = Json::from(42);
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json.clone(), Some(&ConcreteDataType::uint64_datatype()))
.encode_with_type(json.clone(), Some(&JsonNativeType::u64()))
.unwrap()
.into_json_inner()
.unwrap();
@@ -892,7 +893,7 @@ mod tests {
// Test with expected string type
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::string_datatype()))
.encode_with_type(json, Some(&JsonNativeType::String))
.unwrap()
.into_json_inner()
.unwrap();
@@ -961,16 +962,10 @@ mod tests {
});
// Define expected struct type
let fields = vec![
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new("age".to_string(), ConcreteDataType::int64_datatype(), true),
];
let struct_type = StructType::new(Arc::new(fields));
let concrete_type = ConcreteDataType::Struct(struct_type);
let concrete_type = JsonNativeType::Object(JsonObjectType::from([
("name".to_string(), JsonNativeType::String),
("age".to_string(), JsonNativeType::i64()),
]));
let settings = JsonStructureSettings::Structured(None);
let result = settings
@@ -1006,28 +1001,15 @@ mod tests {
});
// Define schema with specific field order
let fields = vec![
StructField::new(
"a_field".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new(
"m_field".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new(
"z_field".to_string(),
ConcreteDataType::string_datatype(),
true,
),
];
let struct_type = StructType::new(Arc::new(fields));
let json_type = JsonObjectType::from([
("a_field".to_string(), JsonNativeType::String),
("m_field".to_string(), JsonNativeType::String),
("z_field".to_string(), JsonNativeType::String),
]);
let Value::Struct(result) = encode_json_object_with_context(
json.as_object().unwrap().clone(),
Some(&struct_type),
Some(&json_type),
&JsonContext {
key_path: String::new(),
settings: &JsonStructureSettings::Structured(None),
@@ -1060,19 +1042,14 @@ mod tests {
});
// Define schema with only name and age
let fields = vec![
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new("age".to_string(), ConcreteDataType::int64_datatype(), true),
];
let struct_type = StructType::new(Arc::new(fields));
let json_type = JsonObjectType::from([
("name".to_string(), JsonNativeType::String),
("age".to_string(), JsonNativeType::i64()),
]);
let Value::Struct(result) = encode_json_object_with_context(
json.as_object().unwrap().clone(),
Some(&struct_type),
Some(&json_type),
&JsonContext {
key_path: String::new(),
settings: &JsonStructureSettings::Structured(None),
@@ -1104,19 +1081,14 @@ mod tests {
});
// Define schema with name and age
let fields = vec![
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new("age".to_string(), ConcreteDataType::int64_datatype(), true),
];
let struct_type = StructType::new(Arc::new(fields));
let json_type = JsonObjectType::from([
("name".to_string(), JsonNativeType::String),
("age".to_string(), JsonNativeType::i64()),
]);
let Value::Struct(result) = encode_json_object_with_context(
json.as_object().unwrap().clone(),
Some(&struct_type),
Some(&json_type),
&JsonContext {
key_path: String::new(),
settings: &JsonStructureSettings::Structured(None),
@@ -1159,11 +1131,12 @@ mod tests {
fn test_encode_json_array_with_item_type() {
let json = json!([1, 2, 3]);
let item_type = Arc::new(ConcreteDataType::uint64_datatype());
let list_type = ListType::new(item_type.clone());
let concrete_type = ConcreteDataType::List(list_type);
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json, Some(&concrete_type))
.encode_with_type(
json,
Some(&JsonNativeType::Array(Box::new(JsonNativeType::u64()))),
)
.unwrap()
.into_json_inner()
.unwrap();
@@ -1183,11 +1156,12 @@ mod tests {
fn test_encode_json_array_empty_with_item_type() {
let json = json!([]);
let item_type = Arc::new(ConcreteDataType::null_datatype());
let list_type = ListType::new(item_type.clone());
let concrete_type = ConcreteDataType::List(list_type);
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json, Some(&concrete_type))
.encode_with_type(
json,
Some(&JsonNativeType::Array(Box::new(JsonNativeType::Null))),
)
.unwrap()
.into_json_inner()
.unwrap();
@@ -1457,7 +1431,7 @@ mod tests {
// Test encoding JSON number with expected int64 type
let json = Json::from(42);
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::int64_datatype()))
.encode_with_type(json, Some(&JsonNativeType::i64()))
.unwrap()
.into_json_inner()
.unwrap();
@@ -1466,7 +1440,7 @@ mod tests {
// Test encoding JSON string with expected string type
let json = Json::String("hello".to_string());
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::string_datatype()))
.encode_with_type(json, Some(&JsonNativeType::String))
.unwrap()
.into_json_inner()
.unwrap();
@@ -1475,7 +1449,7 @@ mod tests {
// Test encoding JSON boolean with expected boolean type
let json = Json::Bool(true);
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::boolean_datatype()))
.encode_with_type(json, Some(&JsonNativeType::Bool))
.unwrap()
.into_json_inner()
.unwrap();
@@ -1487,12 +1461,12 @@ mod tests {
// Test encoding JSON number with mismatched string type
let json = Json::from(42);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, Some(&ConcreteDataType::string_datatype()));
let result = settings.encode_with_type(json, Some(&JsonNativeType::String));
assert!(result.is_ok()); // Should succeed due to type conversion
// Test encoding JSON object with mismatched non-struct type
let json = json!({"name": "test"});
let result = settings.encode_with_type(json, Some(&ConcreteDataType::int64_datatype()));
let result = settings.encode_with_type(json, Some(&JsonNativeType::i64()));
assert!(result.is_err()); // Should fail - object can't be converted to int64
}
@@ -1500,12 +1474,13 @@ mod tests {
fn test_encode_json_array_with_list_type() {
let json = json!([1, 2, 3]);
let item_type = Arc::new(ConcreteDataType::int64_datatype());
let list_type = ListType::new(item_type.clone());
let concrete_type = ConcreteDataType::List(list_type);
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json, Some(&concrete_type))
.encode_with_type(
json,
Some(&JsonNativeType::Array(Box::new(JsonNativeType::i64()))),
)
.unwrap()
.into_json_inner()
.unwrap();
@@ -1527,7 +1502,7 @@ mod tests {
let json = Json::Null;
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json.clone(), Some(&ConcreteDataType::null_datatype()))
.encode_with_type(json.clone(), Some(&JsonNativeType::Null))
.unwrap()
.into_json_inner()
.unwrap();
@@ -1536,7 +1511,7 @@ mod tests {
// Test float with float64 type
let json = Json::from(3.15);
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::float64_datatype()))
.encode_with_type(json, Some(&JsonNativeType::f64()))
.unwrap()
.into_json_inner()
.unwrap();
@@ -1628,20 +1603,11 @@ mod tests {
}
// Test with encode_with_type (with type)
let struct_type = StructType::new(Arc::new(vec![
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new("age".to_string(), ConcreteDataType::int64_datatype(), true),
StructField::new(
"active".to_string(),
ConcreteDataType::boolean_datatype(),
true,
),
let concrete_type = JsonNativeType::Object(JsonObjectType::from([
("name".to_string(), JsonNativeType::String),
("age".to_string(), JsonNativeType::i64()),
("active".to_string(), JsonNativeType::Bool),
]));
let concrete_type = ConcreteDataType::Struct(struct_type);
let result2 = settings
.encode_with_type(json, Some(&concrete_type))

View File

@@ -23,6 +23,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Number;
use crate::data_type::ConcreteDataType;
use crate::types::json_type::JsonNativeType;
use crate::types::{JsonType, StructField, StructType};
use crate::value::{ListValue, ListValueRef, StructValue, StructValueRef, Value, ValueRef};
@@ -108,40 +109,37 @@ pub enum JsonVariant {
}
impl JsonVariant {
fn json_type(&self) -> JsonType {
fn native_type(&self) -> JsonNativeType {
match self {
JsonVariant::Null => JsonType::new_native(ConcreteDataType::null_datatype()),
JsonVariant::Bool(_) => JsonType::new_native(ConcreteDataType::boolean_datatype()),
JsonVariant::Number(n) => JsonType::new_native(match n {
JsonNumber::PosInt(_) => ConcreteDataType::uint64_datatype(),
JsonNumber::NegInt(_) => ConcreteDataType::int64_datatype(),
JsonNumber::Float(_) => ConcreteDataType::float64_datatype(),
}),
JsonVariant::String(_) => JsonType::new_native(ConcreteDataType::string_datatype()),
JsonVariant::Null => JsonNativeType::Null,
JsonVariant::Bool(_) => JsonNativeType::Bool,
JsonVariant::Number(n) => match n {
JsonNumber::PosInt(_) => JsonNativeType::u64(),
JsonNumber::NegInt(_) => JsonNativeType::i64(),
JsonNumber::Float(_) => JsonNativeType::f64(),
},
JsonVariant::String(_) => JsonNativeType::String,
JsonVariant::Array(array) => {
let item_type = if let Some(first) = array.first() {
first.json_type().native_type()
first.native_type()
} else {
ConcreteDataType::null_datatype()
JsonNativeType::Null
};
JsonType::new_native(ConcreteDataType::list_datatype(Arc::new(item_type)))
}
JsonVariant::Object(object) => {
let mut fields = Vec::with_capacity(object.len());
for (k, v) in object.iter() {
fields.push(StructField::new(
k.clone(),
v.json_type().native_type(),
true,
))
}
JsonType::new_native(ConcreteDataType::struct_datatype(StructType::new(
Arc::new(fields),
)))
JsonNativeType::Array(Box::new(item_type))
}
JsonVariant::Object(object) => JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.clone(), v.native_type()))
.collect(),
),
}
}
fn json_type(&self) -> JsonType {
JsonType::new_native(self.native_type())
}
fn as_ref(&self) -> JsonVariantRef<'_> {
match self {
JsonVariant::Null => JsonVariantRef::Null,
@@ -333,20 +331,20 @@ impl JsonValue {
JsonVariant::String(x) => Value::String(x.into()),
JsonVariant::Array(array) => {
let item_type = if let Some(first) = array.first() {
first.json_type().native_type()
first.native_type()
} else {
ConcreteDataType::null_datatype()
JsonNativeType::Null
};
Value::List(ListValue::new(
array.into_iter().map(helper).collect(),
Arc::new(item_type),
Arc::new((&item_type).into()),
))
}
JsonVariant::Object(object) => {
let mut fields = Vec::with_capacity(object.len());
let mut items = Vec::with_capacity(object.len());
for (k, v) in object {
fields.push(StructField::new(k, v.json_type().native_type(), true));
fields.push(StructField::new(k, (&v.native_type()).into(), true));
items.push(helper(v));
}
Value::Struct(StructValue::new(items, StructType::new(Arc::new(fields))))
@@ -448,37 +446,33 @@ pub enum JsonVariantRef<'a> {
impl JsonVariantRef<'_> {
fn json_type(&self) -> JsonType {
match self {
JsonVariantRef::Null => JsonType::new_native(ConcreteDataType::null_datatype()),
JsonVariantRef::Bool(_) => JsonType::new_native(ConcreteDataType::boolean_datatype()),
JsonVariantRef::Number(n) => JsonType::new_native(match n {
JsonNumber::PosInt(_) => ConcreteDataType::uint64_datatype(),
JsonNumber::NegInt(_) => ConcreteDataType::int64_datatype(),
JsonNumber::Float(_) => ConcreteDataType::float64_datatype(),
}),
JsonVariantRef::String(_) => JsonType::new_native(ConcreteDataType::string_datatype()),
JsonVariantRef::Array(array) => {
let item_type = if let Some(first) = array.first() {
first.json_type().native_type()
} else {
ConcreteDataType::null_datatype()
};
JsonType::new_native(ConcreteDataType::list_datatype(Arc::new(item_type)))
}
JsonVariantRef::Object(object) => {
let mut fields = Vec::with_capacity(object.len());
for (k, v) in object.iter() {
fields.push(StructField::new(
k.to_string(),
v.json_type().native_type(),
true,
))
fn native_type(v: &JsonVariantRef<'_>) -> JsonNativeType {
match v {
JsonVariantRef::Null => JsonNativeType::Null,
JsonVariantRef::Bool(_) => JsonNativeType::Bool,
JsonVariantRef::Number(n) => match n {
JsonNumber::PosInt(_) => JsonNativeType::u64(),
JsonNumber::NegInt(_) => JsonNativeType::i64(),
JsonNumber::Float(_) => JsonNativeType::f64(),
},
JsonVariantRef::String(_) => JsonNativeType::String,
JsonVariantRef::Array(array) => {
let item_type = if let Some(first) = array.first() {
native_type(first)
} else {
JsonNativeType::Null
};
JsonNativeType::Array(Box::new(item_type))
}
JsonType::new_native(ConcreteDataType::struct_datatype(StructType::new(
Arc::new(fields),
)))
JsonVariantRef::Object(object) => JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.to_string(), native_type(v)))
.collect(),
),
}
}
JsonType::new_native(native_type(self))
}
}

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
@@ -38,11 +39,127 @@ pub const JSON_TYPE_NAME: &str = "Json";
const JSON_PLAIN_FIELD_NAME: &str = "__plain__";
const JSON_PLAIN_FIELD_METADATA_KEY: &str = "is_plain_json";
pub type JsonObjectType = BTreeMap<String, JsonNativeType>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub enum JsonNumberType {
U64,
I64,
F64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub enum JsonNativeType {
Null,
Bool,
Number(JsonNumberType),
String,
Array(Box<JsonNativeType>),
Object(JsonObjectType),
}
impl JsonNativeType {
pub fn u64() -> Self {
Self::Number(JsonNumberType::U64)
}
pub fn i64() -> Self {
Self::Number(JsonNumberType::I64)
}
pub fn f64() -> Self {
Self::Number(JsonNumberType::F64)
}
}
impl From<&JsonNativeType> for ConcreteDataType {
fn from(value: &JsonNativeType) -> Self {
match value {
JsonNativeType::Null => ConcreteDataType::null_datatype(),
JsonNativeType::Bool => ConcreteDataType::boolean_datatype(),
JsonNativeType::Number(JsonNumberType::U64) => ConcreteDataType::uint64_datatype(),
JsonNativeType::Number(JsonNumberType::I64) => ConcreteDataType::int64_datatype(),
JsonNativeType::Number(JsonNumberType::F64) => ConcreteDataType::float64_datatype(),
JsonNativeType::String => ConcreteDataType::string_datatype(),
JsonNativeType::Array(item_type) => {
ConcreteDataType::List(ListType::new(Arc::new(item_type.as_ref().into())))
}
JsonNativeType::Object(object) => {
let fields = object
.iter()
.map(|(type_name, field_type)| {
StructField::new(type_name.clone(), field_type.into(), true)
})
.collect();
ConcreteDataType::Struct(StructType::new(Arc::new(fields)))
}
}
}
}
impl From<&ConcreteDataType> for JsonNativeType {
fn from(value: &ConcreteDataType) -> Self {
match value {
ConcreteDataType::Null(_) => JsonNativeType::Null,
ConcreteDataType::Boolean(_) => JsonNativeType::Bool,
ConcreteDataType::UInt64(_)
| ConcreteDataType::UInt32(_)
| ConcreteDataType::UInt16(_)
| ConcreteDataType::UInt8(_) => JsonNativeType::u64(),
ConcreteDataType::Int64(_)
| ConcreteDataType::Int32(_)
| ConcreteDataType::Int16(_)
| ConcreteDataType::Int8(_) => JsonNativeType::i64(),
ConcreteDataType::Float64(_) | ConcreteDataType::Float32(_) => JsonNativeType::f64(),
ConcreteDataType::String(_) => JsonNativeType::String,
ConcreteDataType::List(list_type) => {
JsonNativeType::Array(Box::new(list_type.item_type().into()))
}
ConcreteDataType::Struct(struct_type) => JsonNativeType::Object(
struct_type
.fields()
.iter()
.map(|field| (field.name().to_string(), field.data_type().into()))
.collect(),
),
ConcreteDataType::Json(json_type) => json_type.native_type().clone(),
_ => unreachable!(),
}
}
}
impl Display for JsonNativeType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
JsonNativeType::Null => write!(f, "Null"),
JsonNativeType::Bool => write!(f, "Bool"),
JsonNativeType::Number(t) => {
write!(f, "Number({t:?})")
}
JsonNativeType::String => write!(f, "String"),
JsonNativeType::Array(item_type) => {
write!(f, "Array[{}]", item_type)
}
JsonNativeType::Object(object) => {
write!(
f,
"Object{{{}}}",
object
.iter()
.map(|(k, v)| format!(r#""{k}": {v}"#))
.collect::<Vec<_>>()
.join(", ")
)
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default)]
pub enum JsonFormat {
#[default]
Jsonb,
Native(Box<ConcreteDataType>),
Native(Box<JsonNativeType>),
}
/// JsonType is a data type for JSON data. It is stored as binary data of jsonb format.
@@ -57,22 +174,22 @@ impl JsonType {
Self { format }
}
pub(crate) fn new_native(native: ConcreteDataType) -> Self {
pub(crate) fn new_native(native: JsonNativeType) -> Self {
Self {
format: JsonFormat::Native(Box::new(native)),
}
}
pub(crate) fn native_type(&self) -> ConcreteDataType {
pub(crate) fn native_type(&self) -> &JsonNativeType {
match &self.format {
JsonFormat::Jsonb => ConcreteDataType::binary_datatype(),
JsonFormat::Native(x) => x.as_ref().clone(),
JsonFormat::Jsonb => &JsonNativeType::String,
JsonFormat::Native(x) => x.as_ref(),
}
}
pub(crate) fn empty() -> Self {
Self {
format: JsonFormat::Native(Box::new(ConcreteDataType::null_datatype())),
format: JsonFormat::Native(Box::new(JsonNativeType::Null)),
}
}
@@ -83,28 +200,13 @@ impl JsonType {
pub(crate) fn as_struct_type(&self) -> StructType {
match &self.format {
JsonFormat::Jsonb => StructType::default(),
JsonFormat::Native(inner) => match inner.as_ref() {
JsonFormat::Native(inner) => match ConcreteDataType::from(inner.as_ref()) {
ConcreteDataType::Struct(t) => t.clone(),
x => plain_json_struct_type(x.clone()),
x => plain_json_struct_type(x),
},
}
}
/// Check if this json type is the special "plain" one.
/// See [JsonType::as_struct_type].
#[expect(unused)]
pub(crate) fn is_plain_json(&self) -> bool {
let JsonFormat::Native(box ConcreteDataType::Struct(t)) = &self.format else {
return true;
};
let fields = t.fields();
let Some((single, [])) = fields.split_first() else {
return false;
};
single.name() == JSON_PLAIN_FIELD_NAME
&& single.metadata(JSON_PLAIN_FIELD_METADATA_KEY) == Some("true")
}
/// Try to merge this json type with others, error on datatype conflict.
pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> {
match (&self.format, &other.format) {
@@ -140,17 +242,11 @@ pub(crate) fn plain_json_struct_type(item_type: ConcreteDataType) -> StructType
StructType::new(Arc::new(vec![field]))
}
fn is_mergeable(this: &ConcreteDataType, that: &ConcreteDataType) -> bool {
fn is_mergeable_struct(this: &StructType, that: &StructType) -> bool {
let this_fields = this.fields();
let this_fields = this_fields
.iter()
.map(|x| (x.name(), x))
.collect::<HashMap<_, _>>();
for that_field in that.fields().iter() {
if let Some(this_field) = this_fields.get(that_field.name())
&& !is_mergeable(this_field.data_type(), that_field.data_type())
fn is_mergeable(this: &JsonNativeType, that: &JsonNativeType) -> bool {
fn is_mergeable_object(this: &JsonObjectType, that: &JsonObjectType) -> bool {
for (type_name, that_type) in that {
if let Some(this_type) = this.get(type_name)
&& !is_mergeable(this_type, that_type)
{
return false;
}
@@ -160,27 +256,41 @@ fn is_mergeable(this: &ConcreteDataType, that: &ConcreteDataType) -> bool {
match (this, that) {
(this, that) if this == that => true,
(ConcreteDataType::List(this), ConcreteDataType::List(that)) => {
is_mergeable(this.item_type(), that.item_type())
(JsonNativeType::Array(this), JsonNativeType::Array(that)) => {
is_mergeable(this.as_ref(), that.as_ref())
}
(ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => {
is_mergeable_struct(this, that)
(JsonNativeType::Object(this), JsonNativeType::Object(that)) => {
is_mergeable_object(this, that)
}
(ConcreteDataType::Null(_), _) | (_, ConcreteDataType::Null(_)) => true,
(JsonNativeType::Null, _) | (_, JsonNativeType::Null) => true,
_ => false,
}
}
fn merge(this: &ConcreteDataType, that: &ConcreteDataType) -> Result<ConcreteDataType> {
fn merge(this: &JsonNativeType, that: &JsonNativeType) -> Result<JsonNativeType> {
fn merge_object(this: &JsonObjectType, that: &JsonObjectType) -> Result<JsonObjectType> {
let mut this = this.clone();
// merge "that" into "this" directly:
for (type_name, that_type) in that {
if let Some(this_type) = this.get_mut(type_name) {
let merged_type = merge(this_type, that_type)?;
*this_type = merged_type;
} else {
this.insert(type_name.clone(), that_type.clone());
}
}
Ok(this)
}
match (this, that) {
(this, that) if this == that => Ok(this.clone()),
(ConcreteDataType::List(this), ConcreteDataType::List(that)) => {
merge_list(this, that).map(ConcreteDataType::List)
(JsonNativeType::Array(this), JsonNativeType::Array(that)) => {
merge(this.as_ref(), that.as_ref()).map(|x| JsonNativeType::Array(Box::new(x)))
}
(ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => {
merge_struct(this, that).map(ConcreteDataType::Struct)
(JsonNativeType::Object(this), JsonNativeType::Object(that)) => {
merge_object(this, that).map(JsonNativeType::Object)
}
(ConcreteDataType::Null(_), x) | (x, ConcreteDataType::Null(_)) => Ok(x.clone()),
(JsonNativeType::Null, x) | (x, JsonNativeType::Null) => Ok(x.clone()),
_ => MergeJsonDatatypeSnafu {
reason: format!("datatypes have conflict, this: {this}, that: {that}"),
}
@@ -188,38 +298,6 @@ fn merge(this: &ConcreteDataType, that: &ConcreteDataType) -> Result<ConcreteDat
}
}
fn merge_list(this: &ListType, that: &ListType) -> Result<ListType> {
let merged = merge(this.item_type(), that.item_type())?;
Ok(ListType::new(Arc::new(merged)))
}
fn merge_struct(this: &StructType, that: &StructType) -> Result<StructType> {
let this = Arc::unwrap_or_clone(this.fields());
let that = Arc::unwrap_or_clone(that.fields());
let mut this: BTreeMap<String, StructField> = this
.into_iter()
.map(|x| (x.name().to_string(), x))
.collect();
// merge "that" into "this" directly:
for that_field in that {
let field_name = that_field.name().to_string();
if let Some(this_field) = this.get(&field_name) {
let merged_field = StructField::new(
field_name.clone(),
merge(this_field.data_type(), that_field.data_type())?,
true, // the value in json object must be always nullable
);
this.insert(field_name, merged_field);
} else {
this.insert(field_name, that_field);
}
}
let fields = this.into_values().collect::<Vec<_>>();
Ok(StructType::new(Arc::new(fields)))
}
impl DataType for JsonType {
fn name(&self) -> String {
match &self.format {
@@ -319,9 +397,7 @@ mod tests {
Ok(())
}
let json_type = &mut JsonType::new(JsonFormat::Native(Box::new(
ConcreteDataType::null_datatype(),
)));
let json_type = &mut JsonType::new_native(JsonNativeType::Null);
// can merge with json object:
let json = r#"{
@@ -329,16 +405,15 @@ mod tests {
"list": [1, 2, 3],
"object": {"a": 1}
}"#;
let expected =
r#"Json<Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
let expected = r#"Json<Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
test(json, json_type, Ok(expected))?;
// cannot merge with other non-object json values:
let jsons = [r#""s""#, "1", "[1]"];
let expects = [
r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: String"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: Int64"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: List<Int64>"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: String"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Number(I64)"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Array[Number(I64)]"#,
];
for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
test(json, json_type, Err(expect))?;
@@ -350,8 +425,7 @@ mod tests {
"float": 0.123,
"no": 42
}"#;
let expected =
r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Int64"#;
let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Number(I64)"#;
test(json, json_type, Err(expected))?;
// can merge with another json object:
@@ -360,7 +434,7 @@ mod tests {
"float": 0.123,
"int": 42
}"#;
let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
test(json, json_type, Ok(expected))?;
// can merge with some complex nested json object:
@@ -370,7 +444,7 @@ mod tests {
"float": 0.456,
"int": 0
}"#;
let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64, "foo": String, "l": List<String>, "o": Struct<"key": String>>>>"#;
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64), "foo": String, "l": Array[String], "o": Object{"key": String}}}>"#;
test(json, json_type, Ok(expected))?;
Ok(())

View File

@@ -151,6 +151,7 @@ impl StructField {
self.metadata.insert(key.to_string(), value.to_string());
}
#[expect(unused)]
pub(crate) fn metadata(&self, key: &str) -> Option<&str> {
self.metadata.get(key).map(String::as_str)
}

View File

@@ -320,10 +320,10 @@ mod tests {
Ok(()),
Ok(()),
Err(
"Failed to merge JSON datatype: datatypes have conflict, this: Int64, that: String",
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: String",
),
Err(
"Failed to merge JSON datatype: datatypes have conflict, this: Int64, that: List<Boolean>",
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: Array[Bool]",
),
];
let mut builder = JsonVectorBuilder::with_capacity(1);
@@ -395,12 +395,12 @@ mod tests {
// test children builders:
assert_eq!(builder.builders.len(), 6);
let expect_types = [
r#"Json<Struct<"list": List<Int64>, "s": String>>"#,
r#"Json<Struct<"float": Float64, "s": String>>"#,
r#"Json<Struct<"float": Float64, "int": Int64>>"#,
r#"Json<Struct<"int": Int64, "object": Struct<"hello": String, "timestamp": Int64>>>"#,
r#"Json<Struct<"nested": Struct<"a": Struct<"b": Struct<"b": Struct<"a": String>>>>, "object": Struct<"timestamp": Int64>>>"#,
r#"Json<Struct<"nested": Struct<"a": Struct<"b": Struct<"a": Struct<"b": String>>>>, "object": Struct<"timestamp": Int64>>>"#,
r#"Json<Object{"list": Array[Number(I64)], "s": String}>"#,
r#"Json<Object{"float": Number(F64), "s": String}>"#,
r#"Json<Object{"float": Number(F64), "int": Number(I64)}>"#,
r#"Json<Object{"int": Number(I64), "object": Object{"hello": String, "timestamp": Number(I64)}}>"#,
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"b": Object{"a": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"a": Object{"b": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
];
let expect_vectors = [
r#"
@@ -455,7 +455,7 @@ mod tests {
}
// test final merged json type:
let expected = r#"Json<Struct<"float": Float64, "int": Int64, "list": List<Int64>, "nested": Struct<"a": Struct<"b": Struct<"a": Struct<"b": String>, "b": Struct<"a": String>>>>, "object": Struct<"hello": String, "timestamp": Int64>, "s": String>>"#;
let expected = r#"Json<Object{"float": Number(F64), "int": Number(I64), "list": Array[Number(I64)], "nested": Object{"a": Object{"b": Object{"a": Object{"b": String}, "b": Object{"a": String}}}}, "object": Object{"hello": String, "timestamp": Number(I64)}, "s": String}>"#;
assert_eq!(builder.data_type().to_string(), expected);
// test final produced vector: