diff --git a/Cargo.lock b/Cargo.lock index 929bb77b23..f86c79aa2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5325,7 +5325,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d75496d5d09dedcd0edcade57ccf0a522f4393ae#d75496d5d09dedcd0edcade57ccf0a522f4393ae" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a250938d7106b77da0ae915eb0c531411c28cfe3#a250938d7106b77da0ae915eb0c531411c28cfe3" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index 1185c9b2d5..40ba49a734 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d75496d5d09dedcd0edcade57ccf0a522f4393ae" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a250938d7106b77da0ae915eb0c531411c28cfe3" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 868684ab0e..0b70ea865d 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -20,7 +20,9 @@ use common_time::time::Time; use common_time::timestamp::TimeUnit; use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp}; use datatypes::prelude::{ConcreteDataType, ValueRef}; -use datatypes::types::{IntervalType, StructField, StructType, TimeType, TimestampType}; +use datatypes::types::{ + IntervalType, JsonFormat, StructField, StructType, TimeType, TimestampType, +}; use datatypes::value::{ ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value, }; @@ -31,8 +33,9 @@ use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ - self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension, - ListTypeExtension, QueryRequest, Row, SemanticType, StructTypeExtension, VectorTypeExtension, + self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonNativeTypeExtension, + JsonTypeExtension, ListTypeExtension, QueryRequest, Row, SemanticType, StructTypeExtension, + VectorTypeExtension, }; use paste::paste; use snafu::prelude::*; @@ -104,7 +107,30 @@ impl From for ConcreteDataType { ConcreteDataType::binary_datatype() } } - ColumnDataType::Json => ConcreteDataType::json_datatype(), + ColumnDataType::Json => { + let type_ext = datatype_wrapper + .datatype_ext + .as_ref() + .and_then(|datatype_ext| datatype_ext.type_ext.as_ref()); + match type_ext { + Some(TypeExt::JsonType(_)) => { + // legacy json type + ConcreteDataType::json_datatype() + } + Some(TypeExt::JsonNativeType(type_ext)) => { + // native json type + let inner_type = ColumnDataTypeWrapper { + datatype: type_ext.datatype(), + datatype_ext: type_ext.datatype_extension.clone().map(|d| *d), + }; + ConcreteDataType::json_native_datatype(inner_type.into()) + } + _ => { + // invalid state, type extension is missing or invalid + ConcreteDataType::null_datatype() + } + } + } ColumnDataType::String => ConcreteDataType::string_datatype(), ColumnDataType::Date => ConcreteDataType::date_datatype(), ColumnDataType::Datetime => ConcreteDataType::timestamp_microsecond_datatype(), @@ -371,9 +397,28 @@ impl TryFrom for ColumnDataTypeWrapper { })), }) } - ColumnDataType::Json => datatype.as_json().map(|_| ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), - }), + ColumnDataType::Json => { + if let Some(json_type) = datatype.as_json() { + match &json_type.format { + JsonFormat::Jsonb => Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + JsonFormat::Native(inner) => { + let inner_type = ColumnDataTypeWrapper::try_from(*inner.clone())?; + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonNativeType(Box::new( + JsonNativeTypeExtension { + datatype: inner_type.datatype.into(), + datatype_extension: inner_type.datatype_ext.map(Box::new), + }, + ))), + }) + } + } + } else { + None + } + } ColumnDataType::Vector => { datatype .as_vector() @@ -537,7 +582,10 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values ..Default::default() }, ColumnDataType::Json => Values { + // TODO(sunng87): remove this when we finally sunset legacy jsonb string_values: Vec::with_capacity(capacity), + // for native json + json_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::Vector => Values { @@ -750,6 +798,24 @@ pub fn pb_value_to_value_ref<'a>( }; ValueRef::Struct(struct_value_ref) } + + ValueData::JsonValue(inner_value) => { + let json_datatype_ext = datatype_ext + .as_ref() + .and_then(|ext| { + if let Some(TypeExt::JsonNativeType(l)) = &ext.type_ext { + Some(l) + } else { + None + } + }) + .expect("json value must contain datatype ext"); + + ValueRef::Json(Box::new(pb_value_to_value_ref( + inner_value, + json_datatype_ext.datatype_extension.as_deref(), + ))) + } } } @@ -870,6 +936,9 @@ pub fn to_proto_value(value: Value) -> v1::Value { items: convert_struct_to_pb_values(struct_value), })), }, + Value::Json(v) => v1::Value { + value_data: Some(ValueData::JsonValue(Box::new(to_proto_value(*v)))), + }, Value::Duration(_) => v1::Value { value_data: None }, } } @@ -924,6 +993,7 @@ pub fn proto_value_type(value: &v1::Value) -> Option { ValueData::Decimal128Value(_) => ColumnDataType::Decimal128, ValueData::ListValue(_) => ColumnDataType::List, ValueData::StructValue(_) => ColumnDataType::Struct, + ValueData::JsonValue(_) => ColumnDataType::Json, }; Some(value_type) } @@ -994,6 +1064,9 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { .collect(); Some(ValueData::StructValue(v1::StructValue { items })) } + Value::Json(inner_value) => Some(ValueData::JsonValue(Box::new(value_to_grpc_value( + *inner_value, + )))), Value::Duration(_) => unreachable!(), }, } @@ -1181,6 +1254,10 @@ mod tests { let values = values_with_capacity(ColumnDataType::Struct, 2); let values = values.struct_values; assert_eq!(2, values.capacity()); + + let values = values_with_capacity(ColumnDataType::Json, 2); + assert_eq!(2, values.json_values.capacity()); + assert_eq!(2, values.string_values.capacity()); } #[test] @@ -1304,6 +1381,54 @@ mod tests { ]) .into() ); + assert_eq!( + ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype( + struct_type.clone() + )), + ColumnDataTypeWrapper::new( + ColumnDataType::Json, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonNativeType(Box::new(JsonNativeTypeExtension { + datatype: ColumnDataType::Struct.into(), + datatype_extension: Some(Box::new(ColumnDataTypeExtension { + type_ext: Some(TypeExt::StructType(StructTypeExtension { + fields: vec![ + v1::StructField { + name: "id".to_string(), + datatype: ColumnDataTypeWrapper::int64_datatype() + .datatype() + .into(), + datatype_extension: None + }, + v1::StructField { + name: "name".to_string(), + datatype: ColumnDataTypeWrapper::string_datatype() + .datatype() + .into(), + datatype_extension: None + }, + v1::StructField { + name: "age".to_string(), + datatype: ColumnDataTypeWrapper::int32_datatype() + .datatype() + .into(), + datatype_extension: None + }, + v1::StructField { + name: "address".to_string(), + datatype: ColumnDataTypeWrapper::string_datatype() + .datatype() + .into(), + datatype_extension: None + } + ] + })) + })) + }))) + }) + ) + .into() + ) } #[test] @@ -1429,7 +1554,71 @@ mod tests { ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()), true ) ])).try_into().expect("Failed to create column datatype from Struct(StructType { fields: [StructField { name: \"a\", data_type: Int64(Int64Type) }, StructField { name: \"a.a\", data_type: List(ListType { item_type: String(StringType) }) }] })") - ) + ); + + let struct_type = StructType::new(vec![ + StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true), + StructField::new( + "name".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + StructField::new("age".to_string(), ConcreteDataType::int32_datatype(), true), + StructField::new( + "address".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ]); + assert_eq!( + ColumnDataTypeWrapper::new( + ColumnDataType::Json, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonNativeType(Box::new(JsonNativeTypeExtension { + datatype: ColumnDataType::Struct.into(), + datatype_extension: Some(Box::new(ColumnDataTypeExtension { + type_ext: Some(TypeExt::StructType(StructTypeExtension { + fields: vec![ + v1::StructField { + name: "id".to_string(), + datatype: ColumnDataTypeWrapper::int64_datatype() + .datatype() + .into(), + datatype_extension: None + }, + v1::StructField { + name: "name".to_string(), + datatype: ColumnDataTypeWrapper::string_datatype() + .datatype() + .into(), + datatype_extension: None + }, + v1::StructField { + name: "age".to_string(), + datatype: ColumnDataTypeWrapper::int32_datatype() + .datatype() + .into(), + datatype_extension: None + }, + v1::StructField { + name: "address".to_string(), + datatype: ColumnDataTypeWrapper::string_datatype() + .datatype() + .into(), + datatype_extension: None + } + ] + })) + })) + }))) + }) + ), + ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype( + struct_type.clone() + )) + .try_into() + .expect("failed to convert json type") + ); } #[test] diff --git a/src/common/macro/src/row/schema.rs b/src/common/macro/src/row/schema.rs index 5e296c8600..67848a36a0 100644 --- a/src/common/macro/src/row/schema.rs +++ b/src/common/macro/src/row/schema.rs @@ -90,6 +90,7 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result { Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })) }) } } + // TODO(sunng87): revisit all these implementations Some(TypeExt::ListType(ext)) => { let item_type = syn::Ident::new(&ext.datatype.to_string(), ident.span()); quote! { @@ -108,6 +109,12 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result { Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::StructType(StructTypeExtension { fields: [#(#fields),*] })) }) } } + Some(TypeExt::JsonNativeType(ext)) => { + let inner = syn::Ident::new(&ext.datatype.to_string(), ident.span()); + quote! { + Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::JsonNativeType(JsonNativeTypeExtension { datatype: #inner })) }) + } + } None => { quote! { None } } diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index c008a9312e..7ae4a419d6 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -33,7 +33,7 @@ use datatypes::arrow::util::pretty; use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::scalars::{ScalarVector, ScalarVectorBuilder}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use datatypes::types::json_type_value_to_string; +use datatypes::types::{JsonFormat, jsonb_to_string}; use datatypes::vectors::{BinaryVector, StringVectorBuilder}; use error::Result; use futures::task::{Context, Poll}; @@ -90,32 +90,34 @@ pub fn map_json_type_to_string( ) -> Result { let mut vectors = Vec::with_capacity(original_schema.column_schemas().len()); for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) { - if let ConcreteDataType::Json(j) = schema.data_type { - let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len()); - let binary_vector = vector - .as_any() - .downcast_ref::() - .with_context(|| error::DowncastVectorSnafu { - from_type: schema.data_type.clone(), - to_type: ConcreteDataType::binary_datatype(), - })?; - for value in binary_vector.iter_data() { - let Some(value) = value else { - string_vector_builder.push(None); - continue; - }; - let string_value = - json_type_value_to_string(value, &j.format).with_context(|_| { - error::CastVectorSnafu { + if let ConcreteDataType::Json(j) = &schema.data_type { + if matches!(&j.format, JsonFormat::Jsonb) { + let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len()); + let binary_vector = vector + .as_any() + .downcast_ref::() + .with_context(|| error::DowncastVectorSnafu { + from_type: schema.data_type.clone(), + to_type: ConcreteDataType::binary_datatype(), + })?; + for value in binary_vector.iter_data() { + let Some(value) = value else { + string_vector_builder.push(None); + continue; + }; + let string_value = + jsonb_to_string(value).with_context(|_| error::CastVectorSnafu { from_type: schema.data_type.clone(), to_type: ConcreteDataType::string_datatype(), - } - })?; - string_vector_builder.push(Some(string_value.as_str())); - } + })?; + string_vector_builder.push(Some(string_value.as_str())); + } - let string_vector = string_vector_builder.finish(); - vectors.push(Arc::new(string_vector) as VectorRef); + let string_vector = string_vector_builder.finish(); + vectors.push(Arc::new(string_vector) as VectorRef); + } else { + vectors.push(vector.clone()); + } } else { vectors.push(vector.clone()); } diff --git a/src/common/sql/src/convert.rs b/src/common/sql/src/convert.rs index 690788feef..0ff2e44061 100644 --- a/src/common/sql/src/convert.rs +++ b/src/common/sql/src/convert.rs @@ -16,9 +16,10 @@ use std::str::FromStr; use common_time::Timestamp; use common_time::timezone::Timezone; +use datatypes::json::JsonStructureSettings; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnDefaultConstraint; -use datatypes::types::{parse_string_to_json_type_value, parse_string_to_vector_type_value}; +use datatypes::types::{JsonFormat, parse_string_to_jsonb, parse_string_to_vector_type_value}; use datatypes::value::{OrderedF32, OrderedF64, Value}; use snafu::{OptionExt, ResultExt, ensure}; pub use sqlparser::ast::{ @@ -210,7 +211,8 @@ pub fn sql_value_to_value( | Value::Duration(_) | Value::IntervalYearMonth(_) | Value::IntervalDayTime(_) - | Value::IntervalMonthDayNano(_) => match unary_op { + | Value::IntervalMonthDayNano(_) + | Value::Json(_) => match unary_op { UnaryOperator::Plus => {} UnaryOperator::Minus => { value = value @@ -297,8 +299,21 @@ pub(crate) fn parse_string_to_value( } ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())), ConcreteDataType::Json(j) => { - let v = parse_string_to_json_type_value(&s, &j.format).context(DatatypeSnafu)?; - Ok(Value::Binary(v.into())) + match &j.format { + JsonFormat::Jsonb => { + let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?; + Ok(Value::Binary(v.into())) + } + JsonFormat::Native(_inner) => { + // Always use the structured version at this level. + let serde_json_value = + serde_json::from_str(&s).context(DeserializeSnafu { json: s })?; + let json_structure_settings = JsonStructureSettings::Structured(None); + json_structure_settings + .encode(serde_json_value) + .context(DatatypeSnafu) + } + } } ConcreteDataType::Vector(d) => { let v = parse_string_to_vector_type_value(&s, Some(d.dim)).context(DatatypeSnafu)?; diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 8b97399284..74bb181cb3 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -33,8 +33,8 @@ use crate::types::{ BinaryType, BooleanType, DateType, Decimal128Type, DictionaryType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, IntervalDayTimeType, - IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonType, ListType, NullType, - StringType, StructType, TimeMillisecondType, TimeType, TimestampMicrosecondType, + IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonFormat, JsonType, ListType, + NullType, StringType, StructType, TimeMillisecondType, TimeType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, VectorType, }; @@ -350,7 +350,7 @@ impl ConcreteDataType { pub fn as_json(&self) -> Option { match self { - ConcreteDataType::Json(j) => Some(*j), + ConcreteDataType::Json(j) => Some(j.clone()), _ => None, } } @@ -668,6 +668,10 @@ impl ConcreteDataType { pub fn vector_default_datatype() -> ConcreteDataType { Self::vector_datatype(0) } + + pub fn json_native_datatype(inner_type: ConcreteDataType) -> ConcreteDataType { + ConcreteDataType::Json(JsonType::new(JsonFormat::Native(Box::new(inner_type)))) + } } /// Data type abstraction. diff --git a/src/datatypes/src/json.rs b/src/datatypes/src/json.rs index ea74f40c73..96894a1736 100644 --- a/src/datatypes/src/json.rs +++ b/src/datatypes/src/json.rs @@ -87,15 +87,16 @@ impl JsonStructureSettings { decode_struct_with_settings(struct_value, &context) } - /// Encode a serde_json::Value into a StructValue using current settings. + /// Encode a serde_json::Value into a Value::Json using current settings. pub fn encode(&self, json: Json) -> Result { let context = JsonContext { key_path: String::new(), settings: self, }; - encode_json_with_context(json, None, &context) + encode_json_with_context(json, None, &context).map(|v| Value::Json(Box::new(v))) } + /// Encode a serde_json::Value into a Value::Json with given data type. pub fn encode_with_type( &self, json: Json, @@ -105,7 +106,7 @@ impl JsonStructureSettings { key_path: String::new(), settings: self, }; - encode_json_with_context(json, data_type, &context) + encode_json_with_context(json, data_type, &context).map(|v| Value::Json(Box::new(v))) } } @@ -394,6 +395,7 @@ pub fn decode_value_with_context<'a>( } match value { + Value::Json(inner) => decode_value_with_context(*inner, context), Value::Struct(struct_value) => decode_struct_with_context(struct_value, context), Value::List(list_value) => decode_list_with_context(list_value, context), _ => decode_primitive_value(value), @@ -507,7 +509,7 @@ fn decode_primitive_value(value: Value) -> Result { } Value::Duration(v) => serde_json::to_value(v.value()).context(error::SerializeSnafu), Value::Decimal128(v) => serde_json::to_value(v.to_string()).context(error::SerializeSnafu), - Value::Struct(_) | Value::List(_) => { + Value::Struct(_) | Value::List(_) | Value::Json(_) => { // These should be handled by the context-aware functions Err(error::InvalidJsonSnafu { value: "Structured values should be handled by context-aware decoding".to_string(), @@ -694,7 +696,11 @@ mod tests { fn test_encode_json_null() { let json = Json::Null; let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); assert_eq!(result, Value::Null); } @@ -702,7 +708,11 @@ mod tests { fn test_encode_json_boolean() { let json = Json::Bool(true); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); assert_eq!(result, Value::Boolean(true)); } @@ -710,7 +720,11 @@ mod tests { fn test_encode_json_number_integer() { let json = Json::from(42); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); assert_eq!(result, Value::Int64(42)); } @@ -718,7 +732,11 @@ mod tests { fn test_encode_json_number_float() { let json = Json::from(3.15); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); match result { Value::Float64(f) => assert_eq!(f.0, 3.15), _ => panic!("Expected Float64"), @@ -729,7 +747,11 @@ mod tests { fn test_encode_json_string() { let json = Json::String("hello".to_string()); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); assert_eq!(result, Value::String("hello".into())); } @@ -737,7 +759,11 @@ mod tests { fn test_encode_json_array() { let json = json!([1, 2, 3]); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); if let Value::List(list_value) = result { assert_eq!(list_value.items().len(), 3); @@ -758,7 +784,11 @@ mod tests { }); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); let Value::Struct(result) = result else { panic!("Expected Struct value"); }; @@ -804,7 +834,11 @@ mod tests { }); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); let Value::Struct(result) = result else { panic!("Expected Struct value"); }; @@ -856,12 +890,16 @@ mod tests { let settings = JsonStructureSettings::Structured(None); let result = settings .encode_with_type(json.clone(), Some(&ConcreteDataType::int8_datatype())) + .unwrap() + .into_json_inner() .unwrap(); assert_eq!(result, Value::Int8(42)); // Test with expected string type let result = settings .encode_with_type(json, Some(&ConcreteDataType::string_datatype())) + .unwrap() + .into_json_inner() .unwrap(); assert_eq!(result, Value::String("42".into())); } @@ -870,7 +908,11 @@ mod tests { fn test_encode_json_array_mixed_types() { let json = json!([1, "hello", true, 3.15]); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); if let Value::List(list_value) = result { assert_eq!(list_value.items().len(), 4); @@ -889,7 +931,11 @@ mod tests { fn test_encode_json_empty_array() { let json = json!([]); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); if let Value::List(list_value) = result { assert_eq!(list_value.items().len(), 0); @@ -911,7 +957,7 @@ mod tests { }); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode(json).unwrap(); + let result = settings.encode(json).unwrap().into_json_inner().unwrap(); if let Value::Struct(struct_value) = result { assert_eq!(struct_value.items().len(), 2); @@ -950,6 +996,8 @@ mod tests { let settings = JsonStructureSettings::Structured(None); let result = settings .encode_with_type(json, Some(&concrete_type)) + .unwrap() + .into_json_inner() .unwrap(); if let Value::Struct(struct_value) = result { @@ -1110,7 +1158,7 @@ mod tests { }); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode(json).unwrap(); + let result = settings.encode(json).unwrap().into_json_inner().unwrap(); if let Value::Struct(struct_value) = result { assert_eq!(struct_value.items().len(), 2); @@ -1127,6 +1175,8 @@ mod tests { let settings = JsonStructureSettings::Structured(None); let result = settings .encode_with_type(json, Some(&concrete_type)) + .unwrap() + .into_json_inner() .unwrap(); if let Value::List(list_value) = result { @@ -1151,6 +1201,8 @@ mod tests { let settings = JsonStructureSettings::Structured(None); let result = settings .encode_with_type(json, Some(&concrete_type)) + .unwrap() + .into_json_inner() .unwrap(); if let Value::List(list_value) = result { @@ -1420,6 +1472,8 @@ mod tests { let json = Json::from(42); let result = settings .encode_with_type(json, Some(&ConcreteDataType::int64_datatype())) + .unwrap() + .into_json_inner() .unwrap(); assert_eq!(result, Value::Int64(42)); @@ -1427,6 +1481,8 @@ mod tests { let json = Json::String("hello".to_string()); let result = settings .encode_with_type(json, Some(&ConcreteDataType::string_datatype())) + .unwrap() + .into_json_inner() .unwrap(); assert_eq!(result, Value::String("hello".into())); @@ -1434,6 +1490,8 @@ mod tests { let json = Json::Bool(true); let result = settings .encode_with_type(json, Some(&ConcreteDataType::boolean_datatype())) + .unwrap() + .into_json_inner() .unwrap(); assert_eq!(result, Value::Boolean(true)); } @@ -1461,6 +1519,8 @@ mod tests { let settings = JsonStructureSettings::Structured(None); let result = settings .encode_with_type(json, Some(&concrete_type)) + .unwrap() + .into_json_inner() .unwrap(); if let Value::List(list_value) = result { @@ -1484,6 +1544,8 @@ mod tests { let settings = JsonStructureSettings::Structured(None); let result = settings .encode_with_type(json.clone(), Some(&ConcreteDataType::null_datatype())) + .unwrap() + .into_json_inner() .unwrap(); assert_eq!(result, Value::Null); @@ -1491,6 +1553,8 @@ mod tests { let json = Json::from(3.15); let result = settings .encode_with_type(json, Some(&ConcreteDataType::float64_datatype())) + .unwrap() + .into_json_inner() .unwrap(); match result { Value::Float64(f) => assert_eq!(f.0, 3.15), @@ -1503,12 +1567,20 @@ mod tests { // Test unsigned integer that fits in i64 let json = Json::from(u64::MAX / 2); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); assert_eq!(result, Value::Int64((u64::MAX / 2) as i64)); // Test unsigned integer that exceeds i64 range let json = Json::from(u64::MAX); - let result = settings.encode_with_type(json, None).unwrap(); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); assert_eq!(result, Value::UInt64(u64::MAX)); } @@ -1520,7 +1592,7 @@ mod tests { }); let settings = JsonStructureSettings::UnstructuredRaw; - let result = settings.encode(json).unwrap(); + let result = settings.encode(json).unwrap().into_json_inner().unwrap(); if let Value::Struct(struct_value) = result { assert_eq!(struct_value.struct_type().fields().len(), 1); @@ -1553,7 +1625,11 @@ mod tests { let settings = JsonStructureSettings::UnstructuredRaw; // Test with encode (no type) - let result = settings.encode(json.clone()).unwrap(); + let result = settings + .encode(json.clone()) + .unwrap() + .into_json_inner() + .unwrap(); if let Value::Struct(s) = result { if let Value::String(json_str) = &s.items()[0] { let json_str = json_str.as_utf8(); @@ -1585,6 +1661,8 @@ mod tests { let result2 = settings .encode_with_type(json, Some(&concrete_type)) + .unwrap() + .into_json_inner() .unwrap(); if let Value::Struct(s) = result2 { if let Value::String(json_str) = &s.items()[0] { @@ -1609,7 +1687,11 @@ mod tests { } }); - let result3 = settings.encode(nested_json).unwrap(); + let result3 = settings + .encode(nested_json) + .unwrap() + .into_json_inner() + .unwrap(); if let Value::Struct(s) = result3 { if let Value::String(json_str) = &s.items()[0] { let json_str = json_str.as_utf8(); @@ -1627,7 +1709,11 @@ mod tests { // Test with arrays let array_json = json!([1, "hello", true, 3.15]); - let result4 = settings.encode(array_json).unwrap(); + let result4 = settings + .encode(array_json) + .unwrap() + .into_json_inner() + .unwrap(); if let Value::Struct(s) = result4 { if let Value::String(json_str) = &s.items()[0] { let json_str = json_str.as_utf8(); @@ -1659,7 +1745,7 @@ mod tests { fields: None, unstructured_keys, }; - let result = settings.encode(json).unwrap(); + let result = settings.encode(json).unwrap().into_json_inner().unwrap(); if let Value::Struct(struct_value) = result { let items = struct_value.items(); @@ -2160,7 +2246,11 @@ mod tests { }); // Encode the JSON with partial unstructured settings - let encoded_value = settings.encode(original_json).unwrap(); + let encoded_value = settings + .encode(original_json) + .unwrap() + .into_json_inner() + .unwrap(); // Verify encoding worked - metadata and user.profile.settings should be unstructured if let Value::Struct(encoded_struct) = encoded_value { diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index 1f28233bb5..eeea086245 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -44,8 +44,8 @@ pub use interval_type::{ IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, }; pub use json_type::{ - JSON_TYPE_NAME, JsonType, json_type_value_to_serde_json, json_type_value_to_string, - parse_string_to_json_type_value, + JSON_TYPE_NAME, JsonFormat, JsonType, jsonb_to_serde_json, jsonb_to_string, + parse_string_to_jsonb, }; pub use list_type::ListType; pub use null_type::NullType; diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index f36372d6df..01ec81dd08 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -21,6 +21,7 @@ use snafu::ResultExt; use crate::data_type::DataType; use crate::error::{DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, Result}; +use crate::prelude::ConcreteDataType; use crate::scalars::ScalarVectorBuilder; use crate::type_id::LogicalTypeId; use crate::value::Value; @@ -28,19 +29,16 @@ use crate::vectors::{BinaryVectorBuilder, MutableVector}; pub const JSON_TYPE_NAME: &str = "Json"; -#[derive( - Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default, -)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default)] pub enum JsonFormat { #[default] Jsonb, + Native(Box), } /// JsonType is a data type for JSON data. It is stored as binary data of jsonb format. /// It utilizes current binary value and vector implementation. -#[derive( - Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, -)] +#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] pub struct JsonType { pub format: JsonFormat, } @@ -81,34 +79,26 @@ impl DataType for JsonType { } /// Converts a json type value to string -pub fn json_type_value_to_string(val: &[u8], format: &JsonFormat) -> Result { - match format { - JsonFormat::Jsonb => match jsonb::from_slice(val) { - Ok(jsonb_value) => { - let serialized = jsonb_value.to_string(); - Ok(serialized) - } - Err(e) => InvalidJsonbSnafu { error: e }.fail(), - }, +pub fn jsonb_to_string(val: &[u8]) -> Result { + match jsonb::from_slice(val) { + Ok(jsonb_value) => { + let serialized = jsonb_value.to_string(); + Ok(serialized) + } + Err(e) => InvalidJsonbSnafu { error: e }.fail(), } } /// Converts a json type value to serde_json::Value -pub fn json_type_value_to_serde_json(val: &[u8], format: &JsonFormat) -> Result { - match format { - JsonFormat::Jsonb => { - let json_string = json_type_value_to_string(val, format)?; - serde_json::Value::from_str(json_string.as_str()) - .context(DeserializeSnafu { json: json_string }) - } - } +pub fn jsonb_to_serde_json(val: &[u8]) -> Result { + let json_string = jsonb_to_string(val)?; + serde_json::Value::from_str(json_string.as_str()) + .context(DeserializeSnafu { json: json_string }) } /// Parses a string to a json type value -pub fn parse_string_to_json_type_value(s: &str, format: &JsonFormat) -> Result> { - match format { - JsonFormat::Jsonb => jsonb::parse_value(s.as_bytes()) - .map_err(|_| InvalidJsonSnafu { value: s }.build()) - .map(|json| json.to_vec()), - } +pub fn parse_string_to_jsonb(s: &str) -> Result> { + jsonb::parse_value(s.as_bytes()) + .map_err(|_| InvalidJsonSnafu { value: s }.build()) + .map(|json| json.to_vec()) } diff --git a/src/datatypes/src/types/string_type.rs b/src/datatypes/src/types/string_type.rs index fdca963707..3b148275d3 100644 --- a/src/datatypes/src/types/string_type.rs +++ b/src/datatypes/src/types/string_type.rs @@ -89,6 +89,8 @@ impl DataType for StringType { Value::Duration(v) => Some(Value::String(StringBytes::from(v.to_string()))), Value::Decimal128(v) => Some(Value::String(StringBytes::from(v.to_string()))), + Value::Json(v) => self.try_cast(*v), + // StringBytes is only support for utf-8, Value::Binary and collections are not allowed. Value::Binary(_) | Value::List(_) | Value::Struct(_) => None, } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 9c10257266..d85bb01dea 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -81,8 +81,12 @@ pub enum Value { IntervalDayTime(IntervalDayTime), IntervalMonthDayNano(IntervalMonthDayNano), + // Collection types: List(ListValue), Struct(StructValue), + + // Json Logical types: + Json(Box), } impl Display for Value { @@ -144,6 +148,9 @@ impl Display for Value { .join(", "); write!(f, "{{ {items} }}") } + Value::Json(json_data) => { + write!(f, "Json({})", json_data) + } } } } @@ -190,6 +197,7 @@ macro_rules! define_data_type_func { $struct::Struct(struct_value) => { ConcreteDataType::struct_datatype(struct_value.struct_type().clone()) } + $struct::Json(v) => ConcreteDataType::json_native_datatype(v.data_type()), } } }; @@ -200,7 +208,11 @@ impl Value { /// Returns true if this is a null value. pub fn is_null(&self) -> bool { - matches!(self, Value::Null) + match self { + Value::Null => true, + Value::Json(inner) => inner.is_null(), + _ => false, + } } /// Cast itself to [ListValue]. @@ -208,6 +220,7 @@ impl Value { match self { Value::Null => Ok(None), Value::List(v) => Ok(Some(v)), + Value::Json(inner) => inner.as_list(), other => error::CastTypeSnafu { msg: format!("Failed to cast {other:?} to list value"), } @@ -219,6 +232,7 @@ impl Value { match self { Value::Null => Ok(None), Value::Struct(v) => Ok(Some(v)), + Value::Json(inner) => inner.as_struct(), other => error::CastTypeSnafu { msg: format!("Failed to cast {other:?} to struct value"), } @@ -253,6 +267,7 @@ impl Value { Value::Duration(v) => ValueRef::Duration(*v), Value::Decimal128(v) => ValueRef::Decimal128(*v), Value::Struct(v) => ValueRef::Struct(StructValueRef::Ref(v)), + Value::Json(v) => ValueRef::Json(Box::new(v.as_value_ref())), } } @@ -322,6 +337,7 @@ impl Value { Value::UInt8(v) => Some(*v as _), Value::UInt16(v) => Some(*v as _), Value::UInt32(v) => Some(*v as _), + Value::Json(inner) => inner.as_i64(), _ => None, } } @@ -333,6 +349,7 @@ impl Value { Value::UInt16(v) => Some(*v as _), Value::UInt32(v) => Some(*v as _), Value::UInt64(v) => Some(*v), + Value::Json(inner) => inner.as_u64(), _ => None, } } @@ -349,6 +366,7 @@ impl Value { Value::UInt16(v) => Some(*v as _), Value::UInt32(v) => Some(*v as _), Value::UInt64(v) => Some(*v as _), + Value::Json(inner) => inner.as_f64_lossy(), _ => None, } } @@ -365,6 +383,15 @@ impl Value { pub fn as_bool(&self) -> Option { match self { Value::Boolean(b) => Some(*b), + Value::Json(inner) => inner.as_bool(), + _ => None, + } + } + + /// Extract the inner JSON value from a JSON type. + pub fn into_json_inner(self) -> Option { + match self { + Value::Json(v) => Some(*v), _ => None, } } @@ -411,6 +438,7 @@ impl Value { }, Value::Decimal128(_) => LogicalTypeId::Decimal128, Value::Struct(_) => LogicalTypeId::Struct, + Value::Json(_) => LogicalTypeId::Json, } } @@ -420,11 +448,11 @@ impl Value { let value_type_id = self.logical_type_id(); let output_type_id = output_type.logical_type_id(); ensure!( - // Json type leverage Value(Binary) for storage. output_type_id == value_type_id || self.is_null() || (output_type_id == LogicalTypeId::Json - && value_type_id == LogicalTypeId::Binary), + && (value_type_id == LogicalTypeId::Binary + || value_type_id == LogicalTypeId::Json)), error::ToScalarValueSnafu { reason: format!( "expect value to return output_type {output_type_id:?}, actual: {value_type_id:?}", @@ -467,6 +495,7 @@ impl Value { let struct_type = output_type.as_struct().unwrap(); struct_value.try_to_scalar_value(struct_type)? } + Value::Json(v) => v.try_to_scalar_value(output_type)?, }; Ok(scalar_value) @@ -519,6 +548,8 @@ impl Value { Value::IntervalDayTime(x) => Some(Value::IntervalDayTime(x.negative())), Value::IntervalMonthDayNano(x) => Some(Value::IntervalMonthDayNano(x.negative())), + Value::Json(v) => v.try_negative().map(|neg| Value::Json(Box::new(neg))), + Value::Binary(_) | Value::String(_) | Value::Boolean(_) @@ -877,6 +908,7 @@ impl TryFrom for serde_json::Value { .collect::>>()?; serde_json::Value::Object(map) } + Value::Json(v) => serde_json::Value::try_from(*v)?, }; Ok(json_value) @@ -1205,6 +1237,7 @@ impl From> for Value { ValueRef::List(v) => v.to_value(), ValueRef::Decimal128(v) => Value::Decimal128(v), ValueRef::Struct(v) => v.to_value(), + ValueRef::Json(v) => Value::Json(Box::new(Value::from(*v))), } } } @@ -1247,6 +1280,8 @@ pub enum ValueRef<'a> { // Compound types: List(ListValueRef<'a>), Struct(StructValueRef<'a>), + + Json(Box>), } macro_rules! impl_as_for_value_ref { @@ -1271,7 +1306,11 @@ impl<'a> ValueRef<'a> { /// Returns true if this is null. pub fn is_null(&self) -> bool { - matches!(self, ValueRef::Null) + match self { + ValueRef::Null => true, + ValueRef::Json(v) => v.is_null(), + _ => false, + } } /// Cast itself to binary slice. @@ -1648,6 +1687,7 @@ impl ValueRef<'_> { StructValueRef::Ref(val) => val.estimated_size(), StructValueRef::RefList { val, .. } => val.iter().map(|v| v.data_size()).sum(), }, + ValueRef::Json(v) => v.data_size(), } } } @@ -1710,6 +1750,10 @@ pub(crate) mod tests { ScalarValue::Struct(Arc::new(struct_arrow_array)) } + pub fn build_list_type() -> ConcreteDataType { + ConcreteDataType::list_datatype(ConcreteDataType::boolean_datatype()) + } + pub(crate) fn build_list_value() -> ListValue { let items = vec![Value::Boolean(true), Value::Boolean(false)]; ListValue::new(items, ConcreteDataType::boolean_datatype()) @@ -2185,6 +2229,23 @@ pub(crate) mod tests { &ConcreteDataType::struct_datatype(build_struct_type()), &Value::Struct(build_struct_value()), ); + + check_type_and_value( + &ConcreteDataType::json_native_datatype(ConcreteDataType::boolean_datatype()), + &Value::Json(Box::new(Value::Boolean(true))), + ); + + check_type_and_value( + &ConcreteDataType::json_native_datatype(build_list_type()), + &Value::Json(Box::new(Value::List(build_list_value()))), + ); + + check_type_and_value( + &ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype( + build_struct_type(), + )), + &Value::Json(Box::new(Value::Struct(build_struct_value()))), + ); } #[test] @@ -2315,7 +2376,35 @@ pub(crate) mod tests { ) .unwrap(); assert_eq!( - serde_json::Value::try_from(Value::Struct(struct_value)).unwrap(), + serde_json::Value::try_from(Value::Struct(struct_value.clone())).unwrap(), + serde_json::json!({ + "num": 42, + "name": "tomcat", + "yes_or_no": true + }) + ); + + // string wrapped in json + assert_eq!( + serde_json::Value::try_from(Value::Json(Box::new(Value::String("hello".into())))) + .unwrap(), + serde_json::json!("hello") + ); + + // list wrapped in json + assert_eq!( + serde_json::Value::try_from(Value::Json(Box::new(Value::List(ListValue::new( + vec![Value::Int32(1), Value::Int32(2), Value::Int32(3),], + ConcreteDataType::int32_datatype() + ))))) + .unwrap(), + serde_json::json!([1, 2, 3]) + ); + + // struct wrapped in json + assert_eq!( + serde_json::Value::try_from(Value::Json(Box::new(Value::Struct(struct_value)))) + .unwrap(), serde_json::json!({ "num": 42, "name": "tomcat", @@ -2327,6 +2416,7 @@ pub(crate) mod tests { #[test] fn test_null_value() { assert!(Value::Null.is_null()); + assert!(Value::Json(Box::new(Value::Null)).is_null()); assert!(!Value::Boolean(true).is_null()); assert!(Value::Null < Value::Boolean(false)); assert!(Value::Boolean(true) > Value::Null); @@ -2405,6 +2495,13 @@ pub(crate) mod tests { ValueRef::Struct(StructValueRef::Ref(&struct_value)), Value::Struct(struct_value.clone()).as_value_ref() ); + + assert_eq!( + ValueRef::Json(Box::new(ValueRef::Struct(StructValueRef::Ref( + &struct_value + )))), + Value::Json(Box::new(Value::Struct(struct_value.clone()))).as_value_ref() + ); } #[test] @@ -2525,6 +2622,11 @@ pub(crate) mod tests { Value::Struct(build_struct_value()).to_string(), "{ id: 1, name: tom, age: 25, address: 94038, awards: Boolean[true, false] }" ); + + assert_eq!( + Value::Json(Box::new(Value::Struct(build_struct_value()))).to_string(), + "Json({ id: 1, name: tom, age: 25, address: 94038, awards: Boolean[true, false] })" + ) } #[test] @@ -3013,6 +3115,13 @@ pub(crate) mod tests { &ValueRef::Struct(StructValueRef::Ref(&build_struct_value())), 15, ); + + check_value_ref_size_eq( + &ValueRef::Json(Box::new(ValueRef::Struct(StructValueRef::Ref( + &build_struct_value(), + )))), + 15, + ); } #[test] diff --git a/src/frontend/src/limiter.rs b/src/frontend/src/limiter.rs index 8c728c42c7..e0e32e6b1b 100644 --- a/src/frontend/src/limiter.rs +++ b/src/frontend/src/limiter.rs @@ -229,6 +229,12 @@ impl Limiter { .unwrap_or(0) }) .sum(), + ValueData::JsonValue(inner) => inner + .as_ref() + .value_data + .as_ref() + .map(Self::size_of_value_data) + .unwrap_or(0), } } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index e373e0a780..48fa49222b 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -34,7 +34,7 @@ use common_time::timestamp::TimeUnit; use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::SchemaRef; -use datatypes::types::json_type_value_to_serde_json; +use datatypes::types::jsonb_to_serde_json; use event::{LogState, LogValidatorRef}; use futures::FutureExt; use http::{HeaderValue, Method}; @@ -301,11 +301,11 @@ impl HttpRecordsOutput { let schema = &schemas[col_idx]; for row_idx in 0..recordbatch.num_rows() { let value = col.get(row_idx); - let value = if let ConcreteDataType::Json(json_type) = schema.data_type + // TODO(sunng87): is this duplicated with `map_json_type_to_string` in recordbatch? + let value = if let ConcreteDataType::Json(_json_type) = &schema.data_type && let datatypes::value::Value::Binary(bytes) = value { - json_type_value_to_serde_json(bytes.as_ref(), &json_type.format) - .context(ConvertSqlValueSnafu)? + jsonb_to_serde_json(bytes.as_ref()).context(ConvertSqlValueSnafu)? } else { serde_json::Value::try_from(col.get(row_idx)).context(ToJsonSnafu)? }; diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 73545db347..58cd3900a2 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -21,7 +21,7 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_telemetry::{debug, error}; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::SchemaRef; -use datatypes::types::json_type_value_to_string; +use datatypes::types::jsonb_to_string; use futures::StreamExt; use itertools::Itertools; use opensrv_mysql::{ @@ -31,7 +31,7 @@ use session::context::QueryContextRef; use snafu::prelude::*; use tokio::io::AsyncWrite; -use crate::error::{self, ConvertSqlValueSnafu, Result}; +use crate::error::{self, ConvertSqlValueSnafu, Result, ToJsonSnafu}; use crate::metrics::*; /// Try to write multiple output to the writer if possible. @@ -212,10 +212,9 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Value::Float32(v) => row_writer.write_col(v.0)?, Value::Float64(v) => row_writer.write_col(v.0)?, Value::String(v) => row_writer.write_col(v.as_utf8())?, - Value::Binary(v) => match column.data_type { - ConcreteDataType::Json(j) => { - let s = json_type_value_to_string(&v, &j.format) - .context(ConvertSqlValueSnafu)?; + Value::Binary(v) => match &column.data_type { + ConcreteDataType::Json(_j) => { + let s = jsonb_to_string(&v).context(ConvertSqlValueSnafu)?; row_writer.write_col(s)?; } _ => { @@ -248,6 +247,11 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { .map(|(k, v)| format!("{k}: {v}")) .join(", ") ))?, + Value::Json(inner) => { + let json_value = + serde_json::Value::try_from(*inner).context(ToJsonSnafu)?; + row_writer.write_col(json_value.to_string())? + } Value::Time(v) => row_writer .write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?, Value::Decimal128(v) => row_writer.write_col(v.to_string())?, diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 2cd161a61b..65faf5041f 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -28,7 +28,7 @@ use datatypes::arrow::datatypes::DataType as ArrowDataType; use datatypes::json::JsonStructureSettings; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::Schema; -use datatypes::types::{IntervalType, TimestampType, json_type_value_to_string}; +use datatypes::types::{IntervalType, JsonFormat, TimestampType, jsonb_to_string}; use datatypes::value::{ListValue, StructValue}; use pgwire::api::Type; use pgwire::api::portal::{Format, Portal}; @@ -90,8 +90,8 @@ fn encode_array( value_list: ListValue, builder: &mut DataRowEncoder, ) -> PgWireResult<()> { - match value_list.datatype() { - &ConcreteDataType::Boolean(_) => { + match &value_list.datatype() { + ConcreteDataType::Boolean(_) => { let array = value_list .items() .iter() @@ -105,7 +105,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => { + ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => { let array = value_list .items() .iter() @@ -122,7 +122,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => { + ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => { let array = value_list .items() .iter() @@ -139,7 +139,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => { + ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => { let array = value_list .items() .iter() @@ -156,7 +156,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => { + ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => { let array = value_list .items() .iter() @@ -173,7 +173,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Float32(_) => { + ConcreteDataType::Float32(_) => { let array = value_list .items() .iter() @@ -187,7 +187,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Float64(_) => { + ConcreteDataType::Float64(_) => { let array = value_list .items() .iter() @@ -201,7 +201,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => { + ConcreteDataType::Binary(_) | ConcreteDataType::Vector(_) => { let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output(); match *bytea_output { @@ -241,7 +241,7 @@ fn encode_array( } } } - &ConcreteDataType::String(_) => { + ConcreteDataType::String(_) => { let array = value_list .items() .iter() @@ -255,7 +255,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Date(_) => { + ConcreteDataType::Date(_) => { let array = value_list .items() .iter() @@ -279,7 +279,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Timestamp(_) => { + ConcreteDataType::Timestamp(_) => { let array = value_list .items() .iter() @@ -305,7 +305,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Time(_) => { + ConcreteDataType::Time(_) => { let array = value_list .items() .iter() @@ -319,7 +319,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Interval(_) => { + ConcreteDataType::Interval(_) => { let array = value_list .items() .iter() @@ -335,7 +335,7 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Decimal128(_) => { + ConcreteDataType::Decimal128(_) => { let array = value_list .items() .iter() @@ -349,23 +349,42 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Json(j) => { - let array = value_list - .items() - .iter() - .map(|v| match v { - Value::Null => Ok(None), - Value::Binary(v) => { - let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?; - Ok(Some(s)) - } - _ => Err(convert_err(Error::Internal { - err_msg: format!("Invalid list item type, find {v:?}, expected json",), - })), - }) - .collect::>>>()?; - builder.encode_field(&array) - } + ConcreteDataType::Json(j) => match &j.format { + JsonFormat::Jsonb => { + let array = value_list + .take_items() + .into_iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Binary(v) => { + let s = jsonb_to_string(&v).map_err(convert_err)?; + Ok(Some(s)) + } + + _ => Err(convert_err(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected json",), + })), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + JsonFormat::Native(_) => { + let array = value_list + .take_items() + .into_iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Json(inner) => serde_json::Value::try_from(*inner) + .map(Some) + .map_err(|e| PgWireError::ApiError(Box::new(e))), + _ => Err(convert_err(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected json",), + })), + }) + .collect::>>>()?; + builder.encode_field(&array) + } + }, _ => Err(convert_err(Error::Internal { err_msg: format!( "cannot write array type {:?} in postgres protocol: unimplemented", @@ -396,8 +415,8 @@ pub(super) fn encode_value( Value::Float64(v) => builder.encode_field(&v.0), Value::String(v) => builder.encode_field(&v.as_utf8()), Value::Binary(v) => match datatype { - ConcreteDataType::Json(j) => { - let s = json_type_value_to_string(v.as_ref(), &j.format).map_err(convert_err)?; + ConcreteDataType::Json(_j) => { + let s = jsonb_to_string(v.as_ref()).map_err(convert_err)?; builder.encode_field(&s) } _ => { @@ -452,6 +471,11 @@ pub(super) fn encode_value( }, Value::List(values) => encode_array(query_ctx, values, builder), Value::Struct(values) => encode_struct(query_ctx, values, builder), + Value::Json(inner) => { + let json_value = serde_json::Value::try_from(*inner) + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + builder.encode_field(&json_value) + } } } @@ -491,9 +515,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY), &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY), &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY), - // TODO(sunng87) we may treat list/array as json directly so we can - // support deeply nested data structures - &ConcreteDataType::Struct(_) => Ok(Type::RECORD_ARRAY), + &ConcreteDataType::Struct(_) => Ok(Type::JSON_ARRAY), &ConcreteDataType::Dictionary(_) | &ConcreteDataType::Vector(_) | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu { @@ -508,7 +530,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { } .fail(), &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL), - &ConcreteDataType::Struct(_) => Ok(Type::RECORD), + &ConcreteDataType::Struct(_) => Ok(Type::JSON), } }