feat: add Value::Json value type (#7083)

* feat: struct value

Signed-off-by: Ning Sun <sunning@greptime.com>

* feat: update for proto module

* feat: wip struct type

* feat: implement more vector operations

* feat: make datatype and api

* feat: reoslve some compilation issues

* feat: resolve all compilation issues

* chore: format update

* test: resolve tests

* test: test and refactor value-to-pb

* feat: add more tests and fix for value types

* chore: remove dbg

* feat: test and fix iterator

* fix: resolve struct_type issue

* feat: pgwire 0.33 update

* refactor: use vec for struct items

* feat: conversion from json to value

* feat: add decode function

* fix: lint issue

* feat: update how we encode raw data

* feat: add convertion to fully strcutured StructValue

* refactor: take owned value in all encode/decode functions

* feat: add pg serialization of structvalue

* chore: toml format

* refactor: adopt new and try_new from struct value

* chore: cleanup residual issues

* docs: docs up

* fix lint issue

* Apply suggestion from @MichaelScofield

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

* Apply suggestion from @MichaelScofield

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

* Apply suggestion from @MichaelScofield

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

* Apply suggestion from @MichaelScofield

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

* chore: address review comment especially collection capacity

* refactor: remove unneeded processed keys collection

* feat: Value::Json type

* chore: add some work in progress changes

* feat: adopt new json type

* refactor: limit scope json conversion functions

* fix: self review update

* test: provide tests for value::json

* test: add tests for api/helper

* switch proto to main branch

* fix: implement is_null for ValueRef::Json

---------

Signed-off-by: Ning Sun <sunning@greptime.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
This commit is contained in:
Ning Sun
2025-10-16 04:13:12 +08:00
committed by GitHub
parent 8073e552df
commit 145c1024d1
16 changed files with 590 additions and 150 deletions

2
Cargo.lock generated
View File

@@ -5325,7 +5325,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d75496d5d09dedcd0edcade57ccf0a522f4393ae#d75496d5d09dedcd0edcade57ccf0a522f4393ae"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a250938d7106b77da0ae915eb0c531411c28cfe3#a250938d7106b77da0ae915eb0c531411c28cfe3"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",

View File

@@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d75496d5d09dedcd0edcade57ccf0a522f4393ae" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a250938d7106b77da0ae915eb0c531411c28cfe3" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -20,7 +20,9 @@ use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
use datatypes::prelude::{ConcreteDataType, ValueRef};
use datatypes::types::{IntervalType, StructField, StructType, TimeType, TimestampType};
use datatypes::types::{
IntervalType, JsonFormat, StructField, StructType, TimeType, TimestampType,
};
use datatypes::value::{
ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value,
};
@@ -31,8 +33,9 @@ use greptime_proto::v1::greptime_request::Request;
use greptime_proto::v1::query_request::Query;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{
self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension,
ListTypeExtension, QueryRequest, Row, SemanticType, StructTypeExtension, VectorTypeExtension,
self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonNativeTypeExtension,
JsonTypeExtension, ListTypeExtension, QueryRequest, Row, SemanticType, StructTypeExtension,
VectorTypeExtension,
};
use paste::paste;
use snafu::prelude::*;
@@ -104,7 +107,30 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
ConcreteDataType::binary_datatype()
}
}
ColumnDataType::Json => ConcreteDataType::json_datatype(),
ColumnDataType::Json => {
let type_ext = datatype_wrapper
.datatype_ext
.as_ref()
.and_then(|datatype_ext| datatype_ext.type_ext.as_ref());
match type_ext {
Some(TypeExt::JsonType(_)) => {
// legacy json type
ConcreteDataType::json_datatype()
}
Some(TypeExt::JsonNativeType(type_ext)) => {
// native json type
let inner_type = ColumnDataTypeWrapper {
datatype: type_ext.datatype(),
datatype_ext: type_ext.datatype_extension.clone().map(|d| *d),
};
ConcreteDataType::json_native_datatype(inner_type.into())
}
_ => {
// invalid state, type extension is missing or invalid
ConcreteDataType::null_datatype()
}
}
}
ColumnDataType::String => ConcreteDataType::string_datatype(),
ColumnDataType::Date => ConcreteDataType::date_datatype(),
ColumnDataType::Datetime => ConcreteDataType::timestamp_microsecond_datatype(),
@@ -371,9 +397,28 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
})),
})
}
ColumnDataType::Json => datatype.as_json().map(|_| ColumnDataTypeExtension {
ColumnDataType::Json => {
if let Some(json_type) = datatype.as_json() {
match &json_type.format {
JsonFormat::Jsonb => Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
JsonFormat::Native(inner) => {
let inner_type = ColumnDataTypeWrapper::try_from(*inner.clone())?;
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonNativeType(Box::new(
JsonNativeTypeExtension {
datatype: inner_type.datatype.into(),
datatype_extension: inner_type.datatype_ext.map(Box::new),
},
))),
})
}
}
} else {
None
}
}
ColumnDataType::Vector => {
datatype
.as_vector()
@@ -537,7 +582,10 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values
..Default::default()
},
ColumnDataType::Json => Values {
// TODO(sunng87): remove this when we finally sunset legacy jsonb
string_values: Vec::with_capacity(capacity),
// for native json
json_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Vector => Values {
@@ -750,6 +798,24 @@ pub fn pb_value_to_value_ref<'a>(
};
ValueRef::Struct(struct_value_ref)
}
ValueData::JsonValue(inner_value) => {
let json_datatype_ext = datatype_ext
.as_ref()
.and_then(|ext| {
if let Some(TypeExt::JsonNativeType(l)) = &ext.type_ext {
Some(l)
} else {
None
}
})
.expect("json value must contain datatype ext");
ValueRef::Json(Box::new(pb_value_to_value_ref(
inner_value,
json_datatype_ext.datatype_extension.as_deref(),
)))
}
}
}
@@ -870,6 +936,9 @@ pub fn to_proto_value(value: Value) -> v1::Value {
items: convert_struct_to_pb_values(struct_value),
})),
},
Value::Json(v) => v1::Value {
value_data: Some(ValueData::JsonValue(Box::new(to_proto_value(*v)))),
},
Value::Duration(_) => v1::Value { value_data: None },
}
}
@@ -924,6 +993,7 @@ pub fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
ValueData::Decimal128Value(_) => ColumnDataType::Decimal128,
ValueData::ListValue(_) => ColumnDataType::List,
ValueData::StructValue(_) => ColumnDataType::Struct,
ValueData::JsonValue(_) => ColumnDataType::Json,
};
Some(value_type)
}
@@ -994,6 +1064,9 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
.collect();
Some(ValueData::StructValue(v1::StructValue { items }))
}
Value::Json(inner_value) => Some(ValueData::JsonValue(Box::new(value_to_grpc_value(
*inner_value,
)))),
Value::Duration(_) => unreachable!(),
},
}
@@ -1181,6 +1254,10 @@ mod tests {
let values = values_with_capacity(ColumnDataType::Struct, 2);
let values = values.struct_values;
assert_eq!(2, values.capacity());
let values = values_with_capacity(ColumnDataType::Json, 2);
assert_eq!(2, values.json_values.capacity());
assert_eq!(2, values.string_values.capacity());
}
#[test]
@@ -1304,6 +1381,54 @@ mod tests {
])
.into()
);
assert_eq!(
ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype(
struct_type.clone()
)),
ColumnDataTypeWrapper::new(
ColumnDataType::Json,
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonNativeType(Box::new(JsonNativeTypeExtension {
datatype: ColumnDataType::Struct.into(),
datatype_extension: Some(Box::new(ColumnDataTypeExtension {
type_ext: Some(TypeExt::StructType(StructTypeExtension {
fields: vec![
v1::StructField {
name: "id".to_string(),
datatype: ColumnDataTypeWrapper::int64_datatype()
.datatype()
.into(),
datatype_extension: None
},
v1::StructField {
name: "name".to_string(),
datatype: ColumnDataTypeWrapper::string_datatype()
.datatype()
.into(),
datatype_extension: None
},
v1::StructField {
name: "age".to_string(),
datatype: ColumnDataTypeWrapper::int32_datatype()
.datatype()
.into(),
datatype_extension: None
},
v1::StructField {
name: "address".to_string(),
datatype: ColumnDataTypeWrapper::string_datatype()
.datatype()
.into(),
datatype_extension: None
}
]
}))
}))
})))
})
)
.into()
)
}
#[test]
@@ -1429,7 +1554,71 @@ mod tests {
ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()), true
)
])).try_into().expect("Failed to create column datatype from Struct(StructType { fields: [StructField { name: \"a\", data_type: Int64(Int64Type) }, StructField { name: \"a.a\", data_type: List(ListType { item_type: String(StringType) }) }] })")
)
);
let struct_type = StructType::new(vec![
StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true),
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
StructField::new(
"address".to_string(),
ConcreteDataType::string_datatype(),
true,
),
]);
assert_eq!(
ColumnDataTypeWrapper::new(
ColumnDataType::Json,
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonNativeType(Box::new(JsonNativeTypeExtension {
datatype: ColumnDataType::Struct.into(),
datatype_extension: Some(Box::new(ColumnDataTypeExtension {
type_ext: Some(TypeExt::StructType(StructTypeExtension {
fields: vec![
v1::StructField {
name: "id".to_string(),
datatype: ColumnDataTypeWrapper::int64_datatype()
.datatype()
.into(),
datatype_extension: None
},
v1::StructField {
name: "name".to_string(),
datatype: ColumnDataTypeWrapper::string_datatype()
.datatype()
.into(),
datatype_extension: None
},
v1::StructField {
name: "age".to_string(),
datatype: ColumnDataTypeWrapper::int32_datatype()
.datatype()
.into(),
datatype_extension: None
},
v1::StructField {
name: "address".to_string(),
datatype: ColumnDataTypeWrapper::string_datatype()
.datatype()
.into(),
datatype_extension: None
}
]
}))
}))
})))
})
),
ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype(
struct_type.clone()
))
.try_into()
.expect("failed to convert json type")
);
}
#[test]

View File

@@ -90,6 +90,7 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result<TokenStream2> {
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })) })
}
}
// TODO(sunng87): revisit all these implementations
Some(TypeExt::ListType(ext)) => {
let item_type = syn::Ident::new(&ext.datatype.to_string(), ident.span());
quote! {
@@ -108,6 +109,12 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result<TokenStream2> {
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::StructType(StructTypeExtension { fields: [#(#fields),*] })) })
}
}
Some(TypeExt::JsonNativeType(ext)) => {
let inner = syn::Ident::new(&ext.datatype.to_string(), ident.span());
quote! {
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::JsonNativeType(JsonNativeTypeExtension { datatype: #inner })) })
}
}
None => {
quote! { None }
}

View File

@@ -33,7 +33,7 @@ use datatypes::arrow::util::pretty;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::types::json_type_value_to_string;
use datatypes::types::{JsonFormat, jsonb_to_string};
use datatypes::vectors::{BinaryVector, StringVectorBuilder};
use error::Result;
use futures::task::{Context, Poll};
@@ -90,7 +90,8 @@ pub fn map_json_type_to_string(
) -> Result<RecordBatch> {
let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
if let ConcreteDataType::Json(j) = schema.data_type {
if let ConcreteDataType::Json(j) = &schema.data_type {
if matches!(&j.format, JsonFormat::Jsonb) {
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
let binary_vector = vector
.as_any()
@@ -105,11 +106,9 @@ pub fn map_json_type_to_string(
continue;
};
let string_value =
json_type_value_to_string(value, &j.format).with_context(|_| {
error::CastVectorSnafu {
jsonb_to_string(value).with_context(|_| error::CastVectorSnafu {
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::string_datatype(),
}
})?;
string_vector_builder.push(Some(string_value.as_str()));
}
@@ -119,6 +118,9 @@ pub fn map_json_type_to_string(
} else {
vectors.push(vector.clone());
}
} else {
vectors.push(vector.clone());
}
}
RecordBatch::new(mapped_schema.clone(), vectors)

View File

@@ -16,9 +16,10 @@ use std::str::FromStr;
use common_time::Timestamp;
use common_time::timezone::Timezone;
use datatypes::json::JsonStructureSettings;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::types::{parse_string_to_json_type_value, parse_string_to_vector_type_value};
use datatypes::types::{JsonFormat, parse_string_to_jsonb, parse_string_to_vector_type_value};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use snafu::{OptionExt, ResultExt, ensure};
pub use sqlparser::ast::{
@@ -210,7 +211,8 @@ pub fn sql_value_to_value(
| Value::Duration(_)
| Value::IntervalYearMonth(_)
| Value::IntervalDayTime(_)
| Value::IntervalMonthDayNano(_) => match unary_op {
| Value::IntervalMonthDayNano(_)
| Value::Json(_) => match unary_op {
UnaryOperator::Plus => {}
UnaryOperator::Minus => {
value = value
@@ -297,9 +299,22 @@ pub(crate) fn parse_string_to_value(
}
ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())),
ConcreteDataType::Json(j) => {
let v = parse_string_to_json_type_value(&s, &j.format).context(DatatypeSnafu)?;
match &j.format {
JsonFormat::Jsonb => {
let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
JsonFormat::Native(_inner) => {
// Always use the structured version at this level.
let serde_json_value =
serde_json::from_str(&s).context(DeserializeSnafu { json: s })?;
let json_structure_settings = JsonStructureSettings::Structured(None);
json_structure_settings
.encode(serde_json_value)
.context(DatatypeSnafu)
}
}
}
ConcreteDataType::Vector(d) => {
let v = parse_string_to_vector_type_value(&s, Some(d.dim)).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))

View File

@@ -33,8 +33,8 @@ use crate::types::{
BinaryType, BooleanType, DateType, Decimal128Type, DictionaryType, DurationMicrosecondType,
DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type,
Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, IntervalDayTimeType,
IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonType, ListType, NullType,
StringType, StructType, TimeMillisecondType, TimeType, TimestampMicrosecondType,
IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonFormat, JsonType, ListType,
NullType, StringType, StructType, TimeMillisecondType, TimeType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType,
UInt8Type, UInt16Type, UInt32Type, UInt64Type, VectorType,
};
@@ -350,7 +350,7 @@ impl ConcreteDataType {
pub fn as_json(&self) -> Option<JsonType> {
match self {
ConcreteDataType::Json(j) => Some(*j),
ConcreteDataType::Json(j) => Some(j.clone()),
_ => None,
}
}
@@ -668,6 +668,10 @@ impl ConcreteDataType {
pub fn vector_default_datatype() -> ConcreteDataType {
Self::vector_datatype(0)
}
pub fn json_native_datatype(inner_type: ConcreteDataType) -> ConcreteDataType {
ConcreteDataType::Json(JsonType::new(JsonFormat::Native(Box::new(inner_type))))
}
}
/// Data type abstraction.

View File

@@ -87,15 +87,16 @@ impl JsonStructureSettings {
decode_struct_with_settings(struct_value, &context)
}
/// Encode a serde_json::Value into a StructValue using current settings.
/// Encode a serde_json::Value into a Value::Json using current settings.
pub fn encode(&self, json: Json) -> Result<Value, Error> {
let context = JsonContext {
key_path: String::new(),
settings: self,
};
encode_json_with_context(json, None, &context)
encode_json_with_context(json, None, &context).map(|v| Value::Json(Box::new(v)))
}
/// Encode a serde_json::Value into a Value::Json with given data type.
pub fn encode_with_type(
&self,
json: Json,
@@ -105,7 +106,7 @@ impl JsonStructureSettings {
key_path: String::new(),
settings: self,
};
encode_json_with_context(json, data_type, &context)
encode_json_with_context(json, data_type, &context).map(|v| Value::Json(Box::new(v)))
}
}
@@ -394,6 +395,7 @@ pub fn decode_value_with_context<'a>(
}
match value {
Value::Json(inner) => decode_value_with_context(*inner, context),
Value::Struct(struct_value) => decode_struct_with_context(struct_value, context),
Value::List(list_value) => decode_list_with_context(list_value, context),
_ => decode_primitive_value(value),
@@ -507,7 +509,7 @@ fn decode_primitive_value(value: Value) -> Result<Json, Error> {
}
Value::Duration(v) => serde_json::to_value(v.value()).context(error::SerializeSnafu),
Value::Decimal128(v) => serde_json::to_value(v.to_string()).context(error::SerializeSnafu),
Value::Struct(_) | Value::List(_) => {
Value::Struct(_) | Value::List(_) | Value::Json(_) => {
// These should be handled by the context-aware functions
Err(error::InvalidJsonSnafu {
value: "Structured values should be handled by context-aware decoding".to_string(),
@@ -694,7 +696,11 @@ mod tests {
fn test_encode_json_null() {
let json = Json::Null;
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::Null);
}
@@ -702,7 +708,11 @@ mod tests {
fn test_encode_json_boolean() {
let json = Json::Bool(true);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::Boolean(true));
}
@@ -710,7 +720,11 @@ mod tests {
fn test_encode_json_number_integer() {
let json = Json::from(42);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::Int64(42));
}
@@ -718,7 +732,11 @@ mod tests {
fn test_encode_json_number_float() {
let json = Json::from(3.15);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
match result {
Value::Float64(f) => assert_eq!(f.0, 3.15),
_ => panic!("Expected Float64"),
@@ -729,7 +747,11 @@ mod tests {
fn test_encode_json_string() {
let json = Json::String("hello".to_string());
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::String("hello".into()));
}
@@ -737,7 +759,11 @@ mod tests {
fn test_encode_json_array() {
let json = json!([1, 2, 3]);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
if let Value::List(list_value) = result {
assert_eq!(list_value.items().len(), 3);
@@ -758,7 +784,11 @@ mod tests {
});
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
let Value::Struct(result) = result else {
panic!("Expected Struct value");
};
@@ -804,7 +834,11 @@ mod tests {
});
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
let Value::Struct(result) = result else {
panic!("Expected Struct value");
};
@@ -856,12 +890,16 @@ mod tests {
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json.clone(), Some(&ConcreteDataType::int8_datatype()))
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::Int8(42));
// Test with expected string type
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::string_datatype()))
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::String("42".into()));
}
@@ -870,7 +908,11 @@ mod tests {
fn test_encode_json_array_mixed_types() {
let json = json!([1, "hello", true, 3.15]);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
if let Value::List(list_value) = result {
assert_eq!(list_value.items().len(), 4);
@@ -889,7 +931,11 @@ mod tests {
fn test_encode_json_empty_array() {
let json = json!([]);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
if let Value::List(list_value) = result {
assert_eq!(list_value.items().len(), 0);
@@ -911,7 +957,7 @@ mod tests {
});
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode(json).unwrap();
let result = settings.encode(json).unwrap().into_json_inner().unwrap();
if let Value::Struct(struct_value) = result {
assert_eq!(struct_value.items().len(), 2);
@@ -950,6 +996,8 @@ mod tests {
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json, Some(&concrete_type))
.unwrap()
.into_json_inner()
.unwrap();
if let Value::Struct(struct_value) = result {
@@ -1110,7 +1158,7 @@ mod tests {
});
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode(json).unwrap();
let result = settings.encode(json).unwrap().into_json_inner().unwrap();
if let Value::Struct(struct_value) = result {
assert_eq!(struct_value.items().len(), 2);
@@ -1127,6 +1175,8 @@ mod tests {
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json, Some(&concrete_type))
.unwrap()
.into_json_inner()
.unwrap();
if let Value::List(list_value) = result {
@@ -1151,6 +1201,8 @@ mod tests {
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json, Some(&concrete_type))
.unwrap()
.into_json_inner()
.unwrap();
if let Value::List(list_value) = result {
@@ -1420,6 +1472,8 @@ mod tests {
let json = Json::from(42);
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::int64_datatype()))
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::Int64(42));
@@ -1427,6 +1481,8 @@ mod tests {
let json = Json::String("hello".to_string());
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::string_datatype()))
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::String("hello".into()));
@@ -1434,6 +1490,8 @@ mod tests {
let json = Json::Bool(true);
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::boolean_datatype()))
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::Boolean(true));
}
@@ -1461,6 +1519,8 @@ mod tests {
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json, Some(&concrete_type))
.unwrap()
.into_json_inner()
.unwrap();
if let Value::List(list_value) = result {
@@ -1484,6 +1544,8 @@ mod tests {
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(json.clone(), Some(&ConcreteDataType::null_datatype()))
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::Null);
@@ -1491,6 +1553,8 @@ mod tests {
let json = Json::from(3.15);
let result = settings
.encode_with_type(json, Some(&ConcreteDataType::float64_datatype()))
.unwrap()
.into_json_inner()
.unwrap();
match result {
Value::Float64(f) => assert_eq!(f.0, 3.15),
@@ -1503,12 +1567,20 @@ mod tests {
// Test unsigned integer that fits in i64
let json = Json::from(u64::MAX / 2);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::Int64((u64::MAX / 2) as i64));
// Test unsigned integer that exceeds i64 range
let json = Json::from(u64::MAX);
let result = settings.encode_with_type(json, None).unwrap();
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
assert_eq!(result, Value::UInt64(u64::MAX));
}
@@ -1520,7 +1592,7 @@ mod tests {
});
let settings = JsonStructureSettings::UnstructuredRaw;
let result = settings.encode(json).unwrap();
let result = settings.encode(json).unwrap().into_json_inner().unwrap();
if let Value::Struct(struct_value) = result {
assert_eq!(struct_value.struct_type().fields().len(), 1);
@@ -1553,7 +1625,11 @@ mod tests {
let settings = JsonStructureSettings::UnstructuredRaw;
// Test with encode (no type)
let result = settings.encode(json.clone()).unwrap();
let result = settings
.encode(json.clone())
.unwrap()
.into_json_inner()
.unwrap();
if let Value::Struct(s) = result {
if let Value::String(json_str) = &s.items()[0] {
let json_str = json_str.as_utf8();
@@ -1585,6 +1661,8 @@ mod tests {
let result2 = settings
.encode_with_type(json, Some(&concrete_type))
.unwrap()
.into_json_inner()
.unwrap();
if let Value::Struct(s) = result2 {
if let Value::String(json_str) = &s.items()[0] {
@@ -1609,7 +1687,11 @@ mod tests {
}
});
let result3 = settings.encode(nested_json).unwrap();
let result3 = settings
.encode(nested_json)
.unwrap()
.into_json_inner()
.unwrap();
if let Value::Struct(s) = result3 {
if let Value::String(json_str) = &s.items()[0] {
let json_str = json_str.as_utf8();
@@ -1627,7 +1709,11 @@ mod tests {
// Test with arrays
let array_json = json!([1, "hello", true, 3.15]);
let result4 = settings.encode(array_json).unwrap();
let result4 = settings
.encode(array_json)
.unwrap()
.into_json_inner()
.unwrap();
if let Value::Struct(s) = result4 {
if let Value::String(json_str) = &s.items()[0] {
let json_str = json_str.as_utf8();
@@ -1659,7 +1745,7 @@ mod tests {
fields: None,
unstructured_keys,
};
let result = settings.encode(json).unwrap();
let result = settings.encode(json).unwrap().into_json_inner().unwrap();
if let Value::Struct(struct_value) = result {
let items = struct_value.items();
@@ -2160,7 +2246,11 @@ mod tests {
});
// Encode the JSON with partial unstructured settings
let encoded_value = settings.encode(original_json).unwrap();
let encoded_value = settings
.encode(original_json)
.unwrap()
.into_json_inner()
.unwrap();
// Verify encoding worked - metadata and user.profile.settings should be unstructured
if let Value::Struct(encoded_struct) = encoded_value {

View File

@@ -44,8 +44,8 @@ pub use interval_type::{
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType,
};
pub use json_type::{
JSON_TYPE_NAME, JsonType, json_type_value_to_serde_json, json_type_value_to_string,
parse_string_to_json_type_value,
JSON_TYPE_NAME, JsonFormat, JsonType, jsonb_to_serde_json, jsonb_to_string,
parse_string_to_jsonb,
};
pub use list_type::ListType;
pub use null_type::NullType;

View File

@@ -21,6 +21,7 @@ use snafu::ResultExt;
use crate::data_type::DataType;
use crate::error::{DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, Result};
use crate::prelude::ConcreteDataType;
use crate::scalars::ScalarVectorBuilder;
use crate::type_id::LogicalTypeId;
use crate::value::Value;
@@ -28,19 +29,16 @@ use crate::vectors::{BinaryVectorBuilder, MutableVector};
pub const JSON_TYPE_NAME: &str = "Json";
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default,
)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default)]
pub enum JsonFormat {
#[default]
Jsonb,
Native(Box<ConcreteDataType>),
}
/// JsonType is a data type for JSON data. It is stored as binary data of jsonb format.
/// It utilizes current binary value and vector implementation.
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
)]
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct JsonType {
pub format: JsonFormat,
}
@@ -81,34 +79,26 @@ impl DataType for JsonType {
}
/// Converts a json type value to string
pub fn json_type_value_to_string(val: &[u8], format: &JsonFormat) -> Result<String> {
match format {
JsonFormat::Jsonb => match jsonb::from_slice(val) {
pub fn jsonb_to_string(val: &[u8]) -> Result<String> {
match jsonb::from_slice(val) {
Ok(jsonb_value) => {
let serialized = jsonb_value.to_string();
Ok(serialized)
}
Err(e) => InvalidJsonbSnafu { error: e }.fail(),
},
}
}
/// Converts a json type value to serde_json::Value
pub fn json_type_value_to_serde_json(val: &[u8], format: &JsonFormat) -> Result<serde_json::Value> {
match format {
JsonFormat::Jsonb => {
let json_string = json_type_value_to_string(val, format)?;
pub fn jsonb_to_serde_json(val: &[u8]) -> Result<serde_json::Value> {
let json_string = jsonb_to_string(val)?;
serde_json::Value::from_str(json_string.as_str())
.context(DeserializeSnafu { json: json_string })
}
}
}
/// Parses a string to a json type value
pub fn parse_string_to_json_type_value(s: &str, format: &JsonFormat) -> Result<Vec<u8>> {
match format {
JsonFormat::Jsonb => jsonb::parse_value(s.as_bytes())
pub fn parse_string_to_jsonb(s: &str) -> Result<Vec<u8>> {
jsonb::parse_value(s.as_bytes())
.map_err(|_| InvalidJsonSnafu { value: s }.build())
.map(|json| json.to_vec()),
}
.map(|json| json.to_vec())
}

View File

@@ -89,6 +89,8 @@ impl DataType for StringType {
Value::Duration(v) => Some(Value::String(StringBytes::from(v.to_string()))),
Value::Decimal128(v) => Some(Value::String(StringBytes::from(v.to_string()))),
Value::Json(v) => self.try_cast(*v),
// StringBytes is only support for utf-8, Value::Binary and collections are not allowed.
Value::Binary(_) | Value::List(_) | Value::Struct(_) => None,
}

View File

@@ -81,8 +81,12 @@ pub enum Value {
IntervalDayTime(IntervalDayTime),
IntervalMonthDayNano(IntervalMonthDayNano),
// Collection types:
List(ListValue),
Struct(StructValue),
// Json Logical types:
Json(Box<Value>),
}
impl Display for Value {
@@ -144,6 +148,9 @@ impl Display for Value {
.join(", ");
write!(f, "{{ {items} }}")
}
Value::Json(json_data) => {
write!(f, "Json({})", json_data)
}
}
}
}
@@ -190,6 +197,7 @@ macro_rules! define_data_type_func {
$struct::Struct(struct_value) => {
ConcreteDataType::struct_datatype(struct_value.struct_type().clone())
}
$struct::Json(v) => ConcreteDataType::json_native_datatype(v.data_type()),
}
}
};
@@ -200,7 +208,11 @@ impl Value {
/// Returns true if this is a null value.
pub fn is_null(&self) -> bool {
matches!(self, Value::Null)
match self {
Value::Null => true,
Value::Json(inner) => inner.is_null(),
_ => false,
}
}
/// Cast itself to [ListValue].
@@ -208,6 +220,7 @@ impl Value {
match self {
Value::Null => Ok(None),
Value::List(v) => Ok(Some(v)),
Value::Json(inner) => inner.as_list(),
other => error::CastTypeSnafu {
msg: format!("Failed to cast {other:?} to list value"),
}
@@ -219,6 +232,7 @@ impl Value {
match self {
Value::Null => Ok(None),
Value::Struct(v) => Ok(Some(v)),
Value::Json(inner) => inner.as_struct(),
other => error::CastTypeSnafu {
msg: format!("Failed to cast {other:?} to struct value"),
}
@@ -253,6 +267,7 @@ impl Value {
Value::Duration(v) => ValueRef::Duration(*v),
Value::Decimal128(v) => ValueRef::Decimal128(*v),
Value::Struct(v) => ValueRef::Struct(StructValueRef::Ref(v)),
Value::Json(v) => ValueRef::Json(Box::new(v.as_value_ref())),
}
}
@@ -322,6 +337,7 @@ impl Value {
Value::UInt8(v) => Some(*v as _),
Value::UInt16(v) => Some(*v as _),
Value::UInt32(v) => Some(*v as _),
Value::Json(inner) => inner.as_i64(),
_ => None,
}
}
@@ -333,6 +349,7 @@ impl Value {
Value::UInt16(v) => Some(*v as _),
Value::UInt32(v) => Some(*v as _),
Value::UInt64(v) => Some(*v),
Value::Json(inner) => inner.as_u64(),
_ => None,
}
}
@@ -349,6 +366,7 @@ impl Value {
Value::UInt16(v) => Some(*v as _),
Value::UInt32(v) => Some(*v as _),
Value::UInt64(v) => Some(*v as _),
Value::Json(inner) => inner.as_f64_lossy(),
_ => None,
}
}
@@ -365,6 +383,15 @@ impl Value {
pub fn as_bool(&self) -> Option<bool> {
match self {
Value::Boolean(b) => Some(*b),
Value::Json(inner) => inner.as_bool(),
_ => None,
}
}
/// Extract the inner JSON value from a JSON type.
pub fn into_json_inner(self) -> Option<Value> {
match self {
Value::Json(v) => Some(*v),
_ => None,
}
}
@@ -411,6 +438,7 @@ impl Value {
},
Value::Decimal128(_) => LogicalTypeId::Decimal128,
Value::Struct(_) => LogicalTypeId::Struct,
Value::Json(_) => LogicalTypeId::Json,
}
}
@@ -420,11 +448,11 @@ impl Value {
let value_type_id = self.logical_type_id();
let output_type_id = output_type.logical_type_id();
ensure!(
// Json type leverage Value(Binary) for storage.
output_type_id == value_type_id
|| self.is_null()
|| (output_type_id == LogicalTypeId::Json
&& value_type_id == LogicalTypeId::Binary),
&& (value_type_id == LogicalTypeId::Binary
|| value_type_id == LogicalTypeId::Json)),
error::ToScalarValueSnafu {
reason: format!(
"expect value to return output_type {output_type_id:?}, actual: {value_type_id:?}",
@@ -467,6 +495,7 @@ impl Value {
let struct_type = output_type.as_struct().unwrap();
struct_value.try_to_scalar_value(struct_type)?
}
Value::Json(v) => v.try_to_scalar_value(output_type)?,
};
Ok(scalar_value)
@@ -519,6 +548,8 @@ impl Value {
Value::IntervalDayTime(x) => Some(Value::IntervalDayTime(x.negative())),
Value::IntervalMonthDayNano(x) => Some(Value::IntervalMonthDayNano(x.negative())),
Value::Json(v) => v.try_negative().map(|neg| Value::Json(Box::new(neg))),
Value::Binary(_)
| Value::String(_)
| Value::Boolean(_)
@@ -877,6 +908,7 @@ impl TryFrom<Value> for serde_json::Value {
.collect::<serde_json::Result<Map<String, serde_json::Value>>>()?;
serde_json::Value::Object(map)
}
Value::Json(v) => serde_json::Value::try_from(*v)?,
};
Ok(json_value)
@@ -1205,6 +1237,7 @@ impl From<ValueRef<'_>> for Value {
ValueRef::List(v) => v.to_value(),
ValueRef::Decimal128(v) => Value::Decimal128(v),
ValueRef::Struct(v) => v.to_value(),
ValueRef::Json(v) => Value::Json(Box::new(Value::from(*v))),
}
}
}
@@ -1247,6 +1280,8 @@ pub enum ValueRef<'a> {
// Compound types:
List(ListValueRef<'a>),
Struct(StructValueRef<'a>),
Json(Box<ValueRef<'a>>),
}
macro_rules! impl_as_for_value_ref {
@@ -1271,7 +1306,11 @@ impl<'a> ValueRef<'a> {
/// Returns true if this is null.
pub fn is_null(&self) -> bool {
matches!(self, ValueRef::Null)
match self {
ValueRef::Null => true,
ValueRef::Json(v) => v.is_null(),
_ => false,
}
}
/// Cast itself to binary slice.
@@ -1648,6 +1687,7 @@ impl ValueRef<'_> {
StructValueRef::Ref(val) => val.estimated_size(),
StructValueRef::RefList { val, .. } => val.iter().map(|v| v.data_size()).sum(),
},
ValueRef::Json(v) => v.data_size(),
}
}
}
@@ -1710,6 +1750,10 @@ pub(crate) mod tests {
ScalarValue::Struct(Arc::new(struct_arrow_array))
}
pub fn build_list_type() -> ConcreteDataType {
ConcreteDataType::list_datatype(ConcreteDataType::boolean_datatype())
}
pub(crate) fn build_list_value() -> ListValue {
let items = vec![Value::Boolean(true), Value::Boolean(false)];
ListValue::new(items, ConcreteDataType::boolean_datatype())
@@ -2185,6 +2229,23 @@ pub(crate) mod tests {
&ConcreteDataType::struct_datatype(build_struct_type()),
&Value::Struct(build_struct_value()),
);
check_type_and_value(
&ConcreteDataType::json_native_datatype(ConcreteDataType::boolean_datatype()),
&Value::Json(Box::new(Value::Boolean(true))),
);
check_type_and_value(
&ConcreteDataType::json_native_datatype(build_list_type()),
&Value::Json(Box::new(Value::List(build_list_value()))),
);
check_type_and_value(
&ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype(
build_struct_type(),
)),
&Value::Json(Box::new(Value::Struct(build_struct_value()))),
);
}
#[test]
@@ -2315,7 +2376,35 @@ pub(crate) mod tests {
)
.unwrap();
assert_eq!(
serde_json::Value::try_from(Value::Struct(struct_value)).unwrap(),
serde_json::Value::try_from(Value::Struct(struct_value.clone())).unwrap(),
serde_json::json!({
"num": 42,
"name": "tomcat",
"yes_or_no": true
})
);
// string wrapped in json
assert_eq!(
serde_json::Value::try_from(Value::Json(Box::new(Value::String("hello".into()))))
.unwrap(),
serde_json::json!("hello")
);
// list wrapped in json
assert_eq!(
serde_json::Value::try_from(Value::Json(Box::new(Value::List(ListValue::new(
vec![Value::Int32(1), Value::Int32(2), Value::Int32(3),],
ConcreteDataType::int32_datatype()
)))))
.unwrap(),
serde_json::json!([1, 2, 3])
);
// struct wrapped in json
assert_eq!(
serde_json::Value::try_from(Value::Json(Box::new(Value::Struct(struct_value))))
.unwrap(),
serde_json::json!({
"num": 42,
"name": "tomcat",
@@ -2327,6 +2416,7 @@ pub(crate) mod tests {
#[test]
fn test_null_value() {
assert!(Value::Null.is_null());
assert!(Value::Json(Box::new(Value::Null)).is_null());
assert!(!Value::Boolean(true).is_null());
assert!(Value::Null < Value::Boolean(false));
assert!(Value::Boolean(true) > Value::Null);
@@ -2405,6 +2495,13 @@ pub(crate) mod tests {
ValueRef::Struct(StructValueRef::Ref(&struct_value)),
Value::Struct(struct_value.clone()).as_value_ref()
);
assert_eq!(
ValueRef::Json(Box::new(ValueRef::Struct(StructValueRef::Ref(
&struct_value
)))),
Value::Json(Box::new(Value::Struct(struct_value.clone()))).as_value_ref()
);
}
#[test]
@@ -2525,6 +2622,11 @@ pub(crate) mod tests {
Value::Struct(build_struct_value()).to_string(),
"{ id: 1, name: tom, age: 25, address: 94038, awards: Boolean[true, false] }"
);
assert_eq!(
Value::Json(Box::new(Value::Struct(build_struct_value()))).to_string(),
"Json({ id: 1, name: tom, age: 25, address: 94038, awards: Boolean[true, false] })"
)
}
#[test]
@@ -3013,6 +3115,13 @@ pub(crate) mod tests {
&ValueRef::Struct(StructValueRef::Ref(&build_struct_value())),
15,
);
check_value_ref_size_eq(
&ValueRef::Json(Box::new(ValueRef::Struct(StructValueRef::Ref(
&build_struct_value(),
)))),
15,
);
}
#[test]

View File

@@ -229,6 +229,12 @@ impl Limiter {
.unwrap_or(0)
})
.sum(),
ValueData::JsonValue(inner) => inner
.as_ref()
.value_data
.as_ref()
.map(Self::size_of_value_data)
.unwrap_or(0),
}
}
}

View File

@@ -34,7 +34,7 @@ use common_time::timestamp::TimeUnit;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
use datatypes::types::json_type_value_to_serde_json;
use datatypes::types::jsonb_to_serde_json;
use event::{LogState, LogValidatorRef};
use futures::FutureExt;
use http::{HeaderValue, Method};
@@ -301,11 +301,11 @@ impl HttpRecordsOutput {
let schema = &schemas[col_idx];
for row_idx in 0..recordbatch.num_rows() {
let value = col.get(row_idx);
let value = if let ConcreteDataType::Json(json_type) = schema.data_type
// TODO(sunng87): is this duplicated with `map_json_type_to_string` in recordbatch?
let value = if let ConcreteDataType::Json(_json_type) = &schema.data_type
&& let datatypes::value::Value::Binary(bytes) = value
{
json_type_value_to_serde_json(bytes.as_ref(), &json_type.format)
.context(ConvertSqlValueSnafu)?
jsonb_to_serde_json(bytes.as_ref()).context(ConvertSqlValueSnafu)?
} else {
serde_json::Value::try_from(col.get(row_idx)).context(ToJsonSnafu)?
};

View File

@@ -21,7 +21,7 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{debug, error};
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::SchemaRef;
use datatypes::types::json_type_value_to_string;
use datatypes::types::jsonb_to_string;
use futures::StreamExt;
use itertools::Itertools;
use opensrv_mysql::{
@@ -31,7 +31,7 @@ use session::context::QueryContextRef;
use snafu::prelude::*;
use tokio::io::AsyncWrite;
use crate::error::{self, ConvertSqlValueSnafu, Result};
use crate::error::{self, ConvertSqlValueSnafu, Result, ToJsonSnafu};
use crate::metrics::*;
/// Try to write multiple output to the writer if possible.
@@ -212,10 +212,9 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Value::Float32(v) => row_writer.write_col(v.0)?,
Value::Float64(v) => row_writer.write_col(v.0)?,
Value::String(v) => row_writer.write_col(v.as_utf8())?,
Value::Binary(v) => match column.data_type {
ConcreteDataType::Json(j) => {
let s = json_type_value_to_string(&v, &j.format)
.context(ConvertSqlValueSnafu)?;
Value::Binary(v) => match &column.data_type {
ConcreteDataType::Json(_j) => {
let s = jsonb_to_string(&v).context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;
}
_ => {
@@ -248,6 +247,11 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
.map(|(k, v)| format!("{k}: {v}"))
.join(", ")
))?,
Value::Json(inner) => {
let json_value =
serde_json::Value::try_from(*inner).context(ToJsonSnafu)?;
row_writer.write_col(json_value.to_string())?
}
Value::Time(v) => row_writer
.write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?,
Value::Decimal128(v) => row_writer.write_col(v.to_string())?,

View File

@@ -28,7 +28,7 @@ use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::json::JsonStructureSettings;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::Schema;
use datatypes::types::{IntervalType, TimestampType, json_type_value_to_string};
use datatypes::types::{IntervalType, JsonFormat, TimestampType, jsonb_to_string};
use datatypes::value::{ListValue, StructValue};
use pgwire::api::Type;
use pgwire::api::portal::{Format, Portal};
@@ -90,8 +90,8 @@ fn encode_array(
value_list: ListValue,
builder: &mut DataRowEncoder,
) -> PgWireResult<()> {
match value_list.datatype() {
&ConcreteDataType::Boolean(_) => {
match &value_list.datatype() {
ConcreteDataType::Boolean(_) => {
let array = value_list
.items()
.iter()
@@ -105,7 +105,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<bool>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
let array = value_list
.items()
.iter()
@@ -122,7 +122,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<i8>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
let array = value_list
.items()
.iter()
@@ -139,7 +139,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<i16>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => {
let array = value_list
.items()
.iter()
@@ -156,7 +156,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<i32>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
let array = value_list
.items()
.iter()
@@ -173,7 +173,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<i64>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Float32(_) => {
ConcreteDataType::Float32(_) => {
let array = value_list
.items()
.iter()
@@ -187,7 +187,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<f32>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Float64(_) => {
ConcreteDataType::Float64(_) => {
let array = value_list
.items()
.iter()
@@ -201,7 +201,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<f64>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => {
ConcreteDataType::Binary(_) | ConcreteDataType::Vector(_) => {
let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
match *bytea_output {
@@ -241,7 +241,7 @@ fn encode_array(
}
}
}
&ConcreteDataType::String(_) => {
ConcreteDataType::String(_) => {
let array = value_list
.items()
.iter()
@@ -255,7 +255,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<&str>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Date(_) => {
ConcreteDataType::Date(_) => {
let array = value_list
.items()
.iter()
@@ -279,7 +279,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Timestamp(_) => {
ConcreteDataType::Timestamp(_) => {
let array = value_list
.items()
.iter()
@@ -305,7 +305,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Time(_) => {
ConcreteDataType::Time(_) => {
let array = value_list
.items()
.iter()
@@ -319,7 +319,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Interval(_) => {
ConcreteDataType::Interval(_) => {
let array = value_list
.items()
.iter()
@@ -335,7 +335,7 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Decimal128(_) => {
ConcreteDataType::Decimal128(_) => {
let array = value_list
.items()
.iter()
@@ -349,16 +349,18 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<String>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Json(j) => {
ConcreteDataType::Json(j) => match &j.format {
JsonFormat::Jsonb => {
let array = value_list
.items()
.iter()
.take_items()
.into_iter()
.map(|v| match v {
Value::Null => Ok(None),
Value::Binary(v) => {
let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
let s = jsonb_to_string(&v).map_err(convert_err)?;
Ok(Some(s))
}
_ => Err(convert_err(Error::Internal {
err_msg: format!("Invalid list item type, find {v:?}, expected json",),
})),
@@ -366,6 +368,23 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<String>>>>()?;
builder.encode_field(&array)
}
JsonFormat::Native(_) => {
let array = value_list
.take_items()
.into_iter()
.map(|v| match v {
Value::Null => Ok(None),
Value::Json(inner) => serde_json::Value::try_from(*inner)
.map(Some)
.map_err(|e| PgWireError::ApiError(Box::new(e))),
_ => Err(convert_err(Error::Internal {
err_msg: format!("Invalid list item type, find {v:?}, expected json",),
})),
})
.collect::<PgWireResult<Vec<Option<serde_json::Value>>>>()?;
builder.encode_field(&array)
}
},
_ => Err(convert_err(Error::Internal {
err_msg: format!(
"cannot write array type {:?} in postgres protocol: unimplemented",
@@ -396,8 +415,8 @@ pub(super) fn encode_value(
Value::Float64(v) => builder.encode_field(&v.0),
Value::String(v) => builder.encode_field(&v.as_utf8()),
Value::Binary(v) => match datatype {
ConcreteDataType::Json(j) => {
let s = json_type_value_to_string(v.as_ref(), &j.format).map_err(convert_err)?;
ConcreteDataType::Json(_j) => {
let s = jsonb_to_string(v.as_ref()).map_err(convert_err)?;
builder.encode_field(&s)
}
_ => {
@@ -452,6 +471,11 @@ pub(super) fn encode_value(
},
Value::List(values) => encode_array(query_ctx, values, builder),
Value::Struct(values) => encode_struct(query_ctx, values, builder),
Value::Json(inner) => {
let json_value = serde_json::Value::try_from(*inner)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
builder.encode_field(&json_value)
}
}
}
@@ -491,9 +515,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
&ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
&ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
&ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
// TODO(sunng87) we may treat list/array as json directly so we can
// support deeply nested data structures
&ConcreteDataType::Struct(_) => Ok(Type::RECORD_ARRAY),
&ConcreteDataType::Struct(_) => Ok(Type::JSON_ARRAY),
&ConcreteDataType::Dictionary(_)
| &ConcreteDataType::Vector(_)
| &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
@@ -508,7 +530,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
}
.fail(),
&ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
&ConcreteDataType::Struct(_) => Ok(Type::RECORD),
&ConcreteDataType::Struct(_) => Ok(Type::JSON),
}
}