refactor: support distinct JSON format and improve type conversions (#4979)

refactor: support distinct JSON format
This commit is contained in:
Weny Xu
2024-11-13 11:03:51 +08:00
committed by GitHub
parent c47ad548a4
commit 3bf9981aab
9 changed files with 79 additions and 37 deletions

View File

@@ -268,7 +268,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
ConcreteDataType::Float32(_) => ColumnDataType::Float32,
ConcreteDataType::Float64(_) => ColumnDataType::Float64,
ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => ColumnDataType::Binary,
ConcreteDataType::Binary(_) => ColumnDataType::Binary,
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
@@ -290,6 +290,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
IntervalType::MonthDayNano(_) => ColumnDataType::IntervalMonthDayNano,
},
ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128,
ConcreteDataType::Json(_) => ColumnDataType::Json,
ConcreteDataType::Vector(_) => ColumnDataType::Vector,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
@@ -309,16 +310,9 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
})),
})
}
ColumnDataType::Binary => {
if datatype == ConcreteDataType::json_datatype() {
// Json is the same as binary in proto. The extension marks the binary in proto is actually a json.
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
})
} else {
None
}
}
ColumnDataType::Json => datatype.as_json().map(|_| ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
ColumnDataType::Vector => {
datatype
.as_vector()

View File

@@ -343,6 +343,13 @@ impl ConcreteDataType {
}
}
pub fn as_json(&self) -> Option<JsonType> {
match self {
ConcreteDataType::Json(j) => Some(*j),
_ => None,
}
}
pub fn as_vector(&self) -> Option<VectorType> {
match self {
ConcreteDataType::Vector(v) => Some(*v),

View File

@@ -44,7 +44,9 @@ pub use duration_type::{
pub use interval_type::{
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType,
};
pub use json_type::{JsonType, JSON_TYPE_NAME};
pub use json_type::{
json_type_value_to_string, parse_string_to_json_type_value, JsonType, JSON_TYPE_NAME,
};
pub use list_type::ListType;
pub use null_type::NullType;
pub use primitive_type::{

View File

@@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow::datatypes::DataType as ArrowDataType;
use common_base::bytes::Bytes;
use serde::{Deserialize, Serialize};
use crate::data_type::{DataType, DataTypeRef};
use crate::data_type::DataType;
use crate::error::{InvalidJsonSnafu, Result};
use crate::scalars::ScalarVectorBuilder;
use crate::type_id::LogicalTypeId;
use crate::value::Value;
@@ -26,14 +25,29 @@ use crate::vectors::{BinaryVectorBuilder, MutableVector};
pub const JSON_TYPE_NAME: &str = "Json";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub enum JsonFormat {
Jsonb,
}
impl Default for JsonFormat {
fn default() -> Self {
Self::Jsonb
}
}
/// JsonType is a data type for JSON data. It is stored as binary data of jsonb format.
/// It utilizes current binary value and vector implementation.
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct JsonType;
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
)]
pub struct JsonType {
pub format: JsonFormat,
}
impl JsonType {
pub fn arc() -> DataTypeRef {
Arc::new(Self)
pub fn new(format: JsonFormat) -> Self {
Self { format }
}
}
@@ -65,3 +79,19 @@ impl DataType for JsonType {
}
}
}
/// Converts a json type value to string
pub fn json_type_value_to_string(val: &[u8], format: &JsonFormat) -> Result<String> {
match format {
JsonFormat::Jsonb => Ok(jsonb::to_string(val)),
}
}
/// Parses a string to a json type value
pub fn parse_string_to_json_type_value(s: &str, format: &JsonFormat) -> Result<Vec<u8>> {
match format {
JsonFormat::Jsonb => jsonb::parse_value(s.as_bytes())
.map_err(|_| InvalidJsonSnafu { value: s }.build())
.map(|json| json.to_vec()),
}
}

View File

@@ -398,6 +398,7 @@ fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataTyp
match (column_type, value_type) {
(ct, vt) if ct == vt => true,
(ColumnDataType::Vector, ColumnDataType::Binary) => true,
(ColumnDataType::Json, ColumnDataType::Binary) => true,
_ => false,
}
}

View File

@@ -90,7 +90,7 @@ fn prepare_rows(rows: &mut Option<Rows>) -> Result<()> {
column.datatype_extension = Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
});
column.datatype = ColumnDataType::Binary.into();
column.datatype = ColumnDataType::Json.into();
}
for idx in &indexes {

View File

@@ -21,7 +21,7 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{debug, error};
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::SchemaRef;
use datatypes::types::vector_type_value_to_string;
use datatypes::types::{json_type_value_to_string, vector_type_value_to_string};
use futures::StreamExt;
use opensrv_mysql::{
Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
@@ -212,8 +212,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Value::Float64(v) => row_writer.write_col(v.0)?,
Value::String(v) => row_writer.write_col(v.as_utf8())?,
Value::Binary(v) => match column.data_type {
ConcreteDataType::Json(_) => {
row_writer.write_col(jsonb::to_string(&v))?;
ConcreteDataType::Json(j) => {
let s = json_type_value_to_string(&v, &j.format)
.context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;
}
ConcreteDataType::Vector(d) => {
let s = vector_type_value_to_string(&v, d.dim)

View File

@@ -27,7 +27,9 @@ use datafusion_expr::LogicalPlan;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::Schema;
use datatypes::types::{vector_type_value_to_string, IntervalType, TimestampType};
use datatypes::types::{
json_type_value_to_string, vector_type_value_to_string, IntervalType, TimestampType,
};
use datatypes::value::ListValue;
use pgwire::api::portal::{Format, Portal};
use pgwire::api::results::{DataRowEncoder, FieldInfo};
@@ -350,13 +352,17 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<String>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Json(_) => {
&ConcreteDataType::Json(j) => {
let array = value_list
.items()
.iter()
.map(|v| match v {
Value::Null => Ok(None),
Value::Binary(v) => Ok(Some(jsonb::to_string(v))),
Value::Binary(v) => {
let s = json_type_value_to_string(v, &j.format)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
Ok(Some(s))
}
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Invalid list item type, find {v:?}, expected json",),
}))),
@@ -412,7 +418,11 @@ pub(super) fn encode_value(
Value::Float64(v) => builder.encode_field(&v.0),
Value::String(v) => builder.encode_field(&v.as_utf8()),
Value::Binary(v) => match datatype {
ConcreteDataType::Json(_) => builder.encode_field(&jsonb::to_string(v)),
ConcreteDataType::Json(j) => {
let s = json_type_value_to_string(v, &j.format)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
builder.encode_field(&s)
}
ConcreteDataType::Vector(d) => {
let s = vector_type_value_to_string(v, d.dim)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;

View File

@@ -42,7 +42,9 @@ use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, COMMENT_KEY};
use datatypes::types::{cast, parse_string_to_vector_type_value, TimestampType};
use datatypes::types::{
cast, parse_string_to_json_type_value, parse_string_to_vector_type_value, TimestampType,
};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use snafu::{ensure, OptionExt, ResultExt};
use sqlparser::ast::{ExactNumberInfo, Ident, ObjectName, UnaryOperator};
@@ -126,15 +128,9 @@ fn parse_string_to_value(
}
}
ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())),
ConcreteDataType::Json(_) => {
if let Ok(json) = jsonb::parse_value(s.as_bytes()) {
Ok(Value::Binary(json.to_vec().into()))
} else {
ParseSqlValueSnafu {
msg: format!("Failed to parse {s} to Json value"),
}
.fail()
}
ConcreteDataType::Json(j) => {
let v = parse_string_to_json_type_value(&s, &j.format).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
ConcreteDataType::Vector(d) => {
let v = parse_string_to_vector_type_value(&s, d.dim).context(DatatypeSnafu)?;