diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 64baae1187..2cd338e177 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -268,7 +268,7 @@ impl TryFrom for ColumnDataTypeWrapper { ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, ConcreteDataType::Float32(_) => ColumnDataType::Float32, ConcreteDataType::Float64(_) => ColumnDataType::Float64, - ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => ColumnDataType::Binary, + ConcreteDataType::Binary(_) => ColumnDataType::Binary, ConcreteDataType::String(_) => ColumnDataType::String, ConcreteDataType::Date(_) => ColumnDataType::Date, ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, @@ -290,6 +290,7 @@ impl TryFrom for ColumnDataTypeWrapper { IntervalType::MonthDayNano(_) => ColumnDataType::IntervalMonthDayNano, }, ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128, + ConcreteDataType::Json(_) => ColumnDataType::Json, ConcreteDataType::Vector(_) => ColumnDataType::Vector, ConcreteDataType::Null(_) | ConcreteDataType::List(_) @@ -309,16 +310,9 @@ impl TryFrom for ColumnDataTypeWrapper { })), }) } - ColumnDataType::Binary => { - if datatype == ConcreteDataType::json_datatype() { - // Json is the same as binary in proto. The extension marks the binary in proto is actually a json. - Some(ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), - }) - } else { - None - } - } + ColumnDataType::Json => datatype.as_json().map(|_| ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), ColumnDataType::Vector => { datatype .as_vector() diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index a967ad5fc8..8f81a0c86f 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -343,6 +343,13 @@ impl ConcreteDataType { } } + pub fn as_json(&self) -> Option { + match self { + ConcreteDataType::Json(j) => Some(*j), + _ => None, + } + } + pub fn as_vector(&self) -> Option { match self { ConcreteDataType::Vector(v) => Some(*v), diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index 4e991c1868..af7016ff82 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -44,7 +44,9 @@ pub use duration_type::{ pub use interval_type::{ IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, }; -pub use json_type::{JsonType, JSON_TYPE_NAME}; +pub use json_type::{ + json_type_value_to_string, parse_string_to_json_type_value, JsonType, JSON_TYPE_NAME, +}; pub use list_type::ListType; pub use null_type::NullType; pub use primitive_type::{ diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index 416b59b5c0..0c004e2d44 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use arrow::datatypes::DataType as ArrowDataType; use common_base::bytes::Bytes; use serde::{Deserialize, Serialize}; -use crate::data_type::{DataType, DataTypeRef}; +use crate::data_type::DataType; +use crate::error::{InvalidJsonSnafu, Result}; use crate::scalars::ScalarVectorBuilder; use crate::type_id::LogicalTypeId; use crate::value::Value; @@ -26,14 +25,29 @@ use crate::vectors::{BinaryVectorBuilder, MutableVector}; pub const JSON_TYPE_NAME: &str = "Json"; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub enum JsonFormat { + Jsonb, +} + +impl Default for JsonFormat { + fn default() -> Self { + Self::Jsonb + } +} + /// JsonType is a data type for JSON data. It is stored as binary data of jsonb format. /// It utilizes current binary value and vector implementation. -#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] -pub struct JsonType; +#[derive( + Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, +)] +pub struct JsonType { + pub format: JsonFormat, +} impl JsonType { - pub fn arc() -> DataTypeRef { - Arc::new(Self) + pub fn new(format: JsonFormat) -> Self { + Self { format } } } @@ -65,3 +79,19 @@ impl DataType for JsonType { } } } + +/// Converts a json type value to string +pub fn json_type_value_to_string(val: &[u8], format: &JsonFormat) -> Result { + match format { + JsonFormat::Jsonb => Ok(jsonb::to_string(val)), + } +} + +/// Parses a string to a json type value +pub fn parse_string_to_json_type_value(s: &str, format: &JsonFormat) -> Result> { + match format { + JsonFormat::Jsonb => jsonb::parse_value(s.as_bytes()) + .map_err(|_| InvalidJsonSnafu { value: s }.build()) + .map(|json| json.to_vec()), + } +} diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 299a345578..c6dfcbe6c7 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -398,6 +398,7 @@ fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataTyp match (column_type, value_type) { (ct, vt) if ct == vt => true, (ColumnDataType::Vector, ColumnDataType::Binary) => true, + (ColumnDataType::Json, ColumnDataType::Binary) => true, _ => false, } } diff --git a/src/operator/src/req_convert/common.rs b/src/operator/src/req_convert/common.rs index 2822f1dbec..518f2d55fa 100644 --- a/src/operator/src/req_convert/common.rs +++ b/src/operator/src/req_convert/common.rs @@ -90,7 +90,7 @@ fn prepare_rows(rows: &mut Option) -> Result<()> { column.datatype_extension = Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), }); - column.datatype = ColumnDataType::Binary.into(); + column.datatype = ColumnDataType::Json.into(); } for idx in &indexes { diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 6e46a2b652..90faa171de 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -21,7 +21,7 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_telemetry::{debug, error}; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::SchemaRef; -use datatypes::types::vector_type_value_to_string; +use datatypes::types::{json_type_value_to_string, vector_type_value_to_string}; use futures::StreamExt; use opensrv_mysql::{ Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter, @@ -212,8 +212,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Value::Float64(v) => row_writer.write_col(v.0)?, Value::String(v) => row_writer.write_col(v.as_utf8())?, Value::Binary(v) => match column.data_type { - ConcreteDataType::Json(_) => { - row_writer.write_col(jsonb::to_string(&v))?; + ConcreteDataType::Json(j) => { + let s = json_type_value_to_string(&v, &j.format) + .context(ConvertSqlValueSnafu)?; + row_writer.write_col(s)?; } ConcreteDataType::Vector(d) => { let s = vector_type_value_to_string(&v, d.dim) diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index a26d803b80..b5a3319766 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -27,7 +27,9 @@ use datafusion_expr::LogicalPlan; use datatypes::arrow::datatypes::DataType as ArrowDataType; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::Schema; -use datatypes::types::{vector_type_value_to_string, IntervalType, TimestampType}; +use datatypes::types::{ + json_type_value_to_string, vector_type_value_to_string, IntervalType, TimestampType, +}; use datatypes::value::ListValue; use pgwire::api::portal::{Format, Portal}; use pgwire::api::results::{DataRowEncoder, FieldInfo}; @@ -350,13 +352,17 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } - &ConcreteDataType::Json(_) => { + &ConcreteDataType::Json(j) => { let array = value_list .items() .iter() .map(|v| match v { Value::Null => Ok(None), - Value::Binary(v) => Ok(Some(jsonb::to_string(v))), + Value::Binary(v) => { + let s = json_type_value_to_string(v, &j.format) + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + Ok(Some(s)) + } _ => Err(PgWireError::ApiError(Box::new(Error::Internal { err_msg: format!("Invalid list item type, find {v:?}, expected json",), }))), @@ -412,7 +418,11 @@ pub(super) fn encode_value( Value::Float64(v) => builder.encode_field(&v.0), Value::String(v) => builder.encode_field(&v.as_utf8()), Value::Binary(v) => match datatype { - ConcreteDataType::Json(_) => builder.encode_field(&jsonb::to_string(v)), + ConcreteDataType::Json(j) => { + let s = json_type_value_to_string(v, &j.format) + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + builder.encode_field(&s) + } ConcreteDataType::Vector(d) => { let s = vector_type_value_to_string(v, d.dim) .map_err(|e| PgWireError::ApiError(Box::new(e)))?; diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index fd2cb3dee2..bb0844a469 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -42,7 +42,9 @@ use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN}; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, COMMENT_KEY}; -use datatypes::types::{cast, parse_string_to_vector_type_value, TimestampType}; +use datatypes::types::{ + cast, parse_string_to_json_type_value, parse_string_to_vector_type_value, TimestampType, +}; use datatypes::value::{OrderedF32, OrderedF64, Value}; use snafu::{ensure, OptionExt, ResultExt}; use sqlparser::ast::{ExactNumberInfo, Ident, ObjectName, UnaryOperator}; @@ -126,15 +128,9 @@ fn parse_string_to_value( } } ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())), - ConcreteDataType::Json(_) => { - if let Ok(json) = jsonb::parse_value(s.as_bytes()) { - Ok(Value::Binary(json.to_vec().into())) - } else { - ParseSqlValueSnafu { - msg: format!("Failed to parse {s} to Json value"), - } - .fail() - } + ConcreteDataType::Json(j) => { + let v = parse_string_to_json_type_value(&s, &j.format).context(DatatypeSnafu)?; + Ok(Value::Binary(v.into())) } ConcreteDataType::Vector(d) => { let v = parse_string_to_vector_type_value(&s, d.dim).context(DatatypeSnafu)?;