feat: add json data type (#4619)

* feat: add json type and vector

* fix: allow to create and insert json data

* feat: udf to query json as string

* refactor: remove JsonbValue and JsonVector

* feat: show json value as strings

* chore: make ci happy

* test: adunit test and sqlness test

* refactor: use binary as grpc value of json

* fix: use non-preserve-order jsonb

* test: revert changed test

* refactor: change udf get_by_path to jq

* chore: make ci happy

* fix: distinguish binary and json in proto

* chore: delete udf for future pr

* refactor: remove Value(Json)

* chore: follow review comments

* test: some tests and checks

* test: fix unit tests

* chore: follow review comments

* chore: corresponding changes to proto

* fix: change grpc and pgsql server behavior alongside with sqlness/crud tests

* chore: follow review comments

* feat: udf of conversions between json and strings, used for grpc server

* refactor: rename to_string to json_to_string

* test: add more sqlness test for json

* chore: thanks for review :)

* Apply suggestions from code review

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
Yohan Wal
2024-09-09 19:41:36 +08:00
committed by GitHub
parent dc89944570
commit 04e7dd6fd5
30 changed files with 1076 additions and 61 deletions

View File

@@ -58,6 +58,7 @@ humantime-serde.workspace = true
hyper = { version = "0.14", features = ["full"] }
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
itertools.workspace = true
jsonb.workspace = true
lazy_static.workspace = true
mime_guess = "2.0"
notify.workspace = true
@@ -70,7 +71,7 @@ parking_lot = "0.12"
pgwire = "0.20"
pin-project = "1.0"
pipeline.workspace = true
postgres-types = { version = "0.2", features = ["with-chrono-0_4"] }
postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1"] }
pprof = { version = "0.13", features = [
"flamegraph",
"prost-codec",

View File

@@ -168,6 +168,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
&mut row_writer,
&record_batch,
query_context.clone(),
&column_def,
)
.await?
}
@@ -191,9 +192,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
row_writer: &mut RowWriter<'_, W>,
recordbatch: &RecordBatch,
query_context: QueryContextRef,
column_def: &[Column],
) -> Result<()> {
for row in recordbatch.rows() {
for value in row.into_iter() {
for (value, column) in row.into_iter().zip(column_def.iter()) {
match value {
Value::Null => row_writer.write_col(None::<u8>)?,
Value::Boolean(v) => row_writer.write_col(v as i8)?,
@@ -208,7 +210,14 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Value::Float32(v) => row_writer.write_col(v.0)?,
Value::Float64(v) => row_writer.write_col(v.0)?,
Value::String(v) => row_writer.write_col(v.as_utf8())?,
Value::Binary(v) => row_writer.write_col(v.deref())?,
Value::Binary(v) => match column.coltype {
ColumnType::MYSQL_TYPE_JSON => {
row_writer.write_col(jsonb::to_string(&v))?;
}
_ => {
row_writer.write_col(v.deref())?;
}
},
Value::Date(v) => row_writer.write_col(v.to_chrono_date())?,
// convert datetime and timestamp to timezone of current connection
Value::DateTime(v) => row_writer.write_col(
@@ -281,6 +290,7 @@ pub(crate) fn create_mysql_column(
ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
_ => error::UnsupportedDataTypeSnafu {
data_type,
reason: "not implemented",

View File

@@ -150,8 +150,8 @@ where
.map(move |row| {
row.and_then(|row| {
let mut encoder = DataRowEncoder::new(pg_schema_ref.clone());
for value in row.iter() {
encode_value(&query_ctx, value, &mut encoder)?;
for (value, column) in row.iter().zip(schema.column_schemas()) {
encode_value(&query_ctx, value, &mut encoder, &column.data_type)?;
}
encoder.finish()
})

View File

@@ -62,6 +62,7 @@ pub(super) fn encode_value(
query_ctx: &QueryContextRef,
value: &Value,
builder: &mut DataRowEncoder,
datatype: &ConcreteDataType,
) -> PgWireResult<()> {
match value {
Value::Null => builder.encode_field(&None::<&i8>),
@@ -77,13 +78,18 @@ pub(super) fn encode_value(
Value::Float32(v) => builder.encode_field(&v.0),
Value::Float64(v) => builder.encode_field(&v.0),
Value::String(v) => builder.encode_field(&v.as_utf8()),
Value::Binary(v) => {
let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
match *bytea_output {
PGByteaOutputValue::ESCAPE => builder.encode_field(&EscapeOutputBytea(v.deref())),
PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
Value::Binary(v) => match datatype {
ConcreteDataType::Json(_) => builder.encode_field(&jsonb::to_string(v)),
_ => {
let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
match *bytea_output {
PGByteaOutputValue::ESCAPE => {
builder.encode_field(&EscapeOutputBytea(v.deref()))
}
PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
}
}
}
},
Value::Date(v) => {
if let Some(date) = v.to_chrono_date() {
let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
@@ -154,6 +160,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
&ConcreteDataType::Time(_) => Ok(Type::TIME),
&ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
&ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
&ConcreteDataType::Json(_) => Ok(Type::JSON),
&ConcreteDataType::Duration(_)
| &ConcreteDataType::List(_)
| &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
@@ -549,6 +556,23 @@ pub(super) fn parameters_to_scalar_values(
}
}
}
&Type::JSONB => {
let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
match server_type {
ConcreteDataType::Binary(_) => {
ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec()))
}
_ => {
return Err(invalid_parameter_error(
"invalid_parameter_type",
Some(&format!(
"Expected: {}, found: {}",
server_type, client_type
)),
));
}
}
}
_ => Err(invalid_parameter_error(
"unsupported_parameter_value",
Some(&format!("Found type: {}", client_type)),
@@ -581,6 +605,8 @@ pub(super) fn param_types_to_pg_types(
mod test {
use std::sync::Arc;
use common_time::interval::IntervalUnit;
use common_time::timestamp::TimeUnit;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::value::ListValue;
use pgwire::api::results::{FieldFormat, FieldInfo};
@@ -778,6 +804,35 @@ mod test {
),
];
let datatypes = vec![
ConcreteDataType::null_datatype(),
ConcreteDataType::boolean_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
ConcreteDataType::int8_datatype(),
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::time_datatype(TimeUnit::Second),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_datatype(TimeUnit::Second),
ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
];
let values = vec![
Value::Null,
Value::Boolean(true),
@@ -812,14 +867,15 @@ mod test {
.build()
.into();
let mut builder = DataRowEncoder::new(Arc::new(schema));
for i in values.iter() {
encode_value(&query_context, i, &mut builder).unwrap();
for (value, datatype) in values.iter().zip(datatypes) {
encode_value(&query_context, value, &mut builder, &datatype).unwrap();
}
let err = encode_value(
&query_context,
&Value::List(ListValue::new(vec![], ConcreteDataType::int16_datatype())),
&mut builder,
&ConcreteDataType::list_datatype(ConcreteDataType::int16_datatype()),
)
.unwrap_err();
match err {