mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 21:32:58 +00:00
refactor: json conversion (#4893)
* refactor: json type update * test: update test * fix: convert when needed * revert: leave sqlness tests unchanged * fix: fmt * refactor: just refactor * Apply suggestions from code review Co-authored-by: Weny Xu <wenymedia@gmail.com> * refactor: parse jsonb first * test: add bad cases * Update src/datatypes/src/vectors/binary.rs Co-authored-by: Weny Xu <wenymedia@gmail.com> * fix: fmt * fix: fix clippy/check --------- Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5524,7 +5524,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "jsonb"
|
name = "jsonb"
|
||||||
version = "0.4.1"
|
version = "0.4.1"
|
||||||
source = "git+https://github.com/datafuselabs/jsonb.git?rev=46ad50fc71cf75afbf98eec455f7892a6387c1fc#46ad50fc71cf75afbf98eec455f7892a6387c1fc"
|
source = "git+https://github.com/databendlabs/jsonb.git?rev=46ad50fc71cf75afbf98eec455f7892a6387c1fc#46ad50fc71cf75afbf98eec455f7892a6387c1fc"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"fast-float",
|
"fast-float",
|
||||||
|
|||||||
@@ -125,7 +125,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
|
|||||||
humantime = "2.1"
|
humantime = "2.1"
|
||||||
humantime-serde = "1.1"
|
humantime-serde = "1.1"
|
||||||
itertools = "0.10"
|
itertools = "0.10"
|
||||||
jsonb = { git = "https://github.com/datafuselabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false }
|
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false }
|
||||||
lazy_static = "1.4"
|
lazy_static = "1.4"
|
||||||
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
|
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
|
||||||
mockall = "0.11.4"
|
mockall = "0.11.4"
|
||||||
|
|||||||
@@ -189,6 +189,13 @@ pub enum Error {
|
|||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Invalid JSON text: {}", value))]
|
||||||
|
InvalidJson {
|
||||||
|
value: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
#[snafu(display("Value exceeds the precision {} bound", precision))]
|
#[snafu(display("Value exceeds the precision {} bound", precision))]
|
||||||
ValueExceedsPrecision {
|
ValueExceedsPrecision {
|
||||||
precision: u8,
|
precision: u8,
|
||||||
@@ -222,7 +229,8 @@ impl ErrorExt for Error {
|
|||||||
| DefaultValueType { .. }
|
| DefaultValueType { .. }
|
||||||
| DuplicateMeta { .. }
|
| DuplicateMeta { .. }
|
||||||
| InvalidTimestampPrecision { .. }
|
| InvalidTimestampPrecision { .. }
|
||||||
| InvalidPrecisionOrScale { .. } => StatusCode::InvalidArguments,
|
| InvalidPrecisionOrScale { .. }
|
||||||
|
| InvalidJson { .. } => StatusCode::InvalidArguments,
|
||||||
|
|
||||||
ValueExceedsPrecision { .. }
|
ValueExceedsPrecision { .. }
|
||||||
| CastType { .. }
|
| CastType { .. }
|
||||||
|
|||||||
@@ -13,6 +13,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(let_chains)]
|
#![feature(let_chains)]
|
||||||
|
#![feature(assert_matches)]
|
||||||
|
|
||||||
pub mod arrow_array;
|
pub mod arrow_array;
|
||||||
pub mod data_type;
|
pub mod data_type;
|
||||||
|
|||||||
@@ -36,6 +36,36 @@ impl BinaryVector {
|
|||||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||||
&self.array
|
&self.array
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Creates a new binary vector of JSONB from a binary vector.
|
||||||
|
/// The binary vector must contain valid JSON strings.
|
||||||
|
pub fn convert_binary_to_json(&self) -> Result<BinaryVector> {
|
||||||
|
let arrow_array = self.to_arrow_array();
|
||||||
|
let mut vector = vec![];
|
||||||
|
for binary in arrow_array
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<BinaryArray>()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
{
|
||||||
|
let jsonb = if let Some(binary) = binary {
|
||||||
|
match jsonb::from_slice(binary) {
|
||||||
|
Ok(jsonb) => Some(jsonb.to_vec()),
|
||||||
|
Err(_) => {
|
||||||
|
let s = String::from_utf8_lossy(binary);
|
||||||
|
return error::InvalidJsonSnafu {
|
||||||
|
value: s.to_string(),
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
vector.push(jsonb);
|
||||||
|
}
|
||||||
|
Ok(BinaryVector::from(vector))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<BinaryArray> for BinaryVector {
|
impl From<BinaryArray> for BinaryVector {
|
||||||
@@ -233,6 +263,8 @@ vectors::impl_try_from_arrow_array_for_vector!(BinaryArray, BinaryVector);
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::assert_matches::assert_matches;
|
||||||
|
|
||||||
use arrow::datatypes::DataType as ArrowDataType;
|
use arrow::datatypes::DataType as ArrowDataType;
|
||||||
use common_base::bytes::Bytes;
|
use common_base::bytes::Bytes;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
@@ -383,4 +415,52 @@ mod tests {
|
|||||||
assert_eq!(b"four", vector.get_data(3).unwrap());
|
assert_eq!(b"four", vector.get_data(3).unwrap());
|
||||||
assert_eq!(builder.len(), 4);
|
assert_eq!(builder.len(), 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_binary_json_conversion() {
|
||||||
|
// json strings
|
||||||
|
let json_strings = vec![
|
||||||
|
b"{\"hello\": \"world\"}".to_vec(),
|
||||||
|
b"{\"foo\": 1}".to_vec(),
|
||||||
|
b"123".to_vec(),
|
||||||
|
];
|
||||||
|
let json_vector = BinaryVector::from(json_strings.clone())
|
||||||
|
.convert_binary_to_json()
|
||||||
|
.unwrap();
|
||||||
|
let jsonbs = json_strings
|
||||||
|
.iter()
|
||||||
|
.map(|v| jsonb::parse_value(v).unwrap().to_vec())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
for i in 0..3 {
|
||||||
|
assert_eq!(
|
||||||
|
json_vector.get_ref(i).as_binary().unwrap().unwrap(),
|
||||||
|
jsonbs.get(i).unwrap().as_slice()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// jsonb
|
||||||
|
let json_vector = BinaryVector::from(jsonbs.clone())
|
||||||
|
.convert_binary_to_json()
|
||||||
|
.unwrap();
|
||||||
|
for i in 0..3 {
|
||||||
|
assert_eq!(
|
||||||
|
json_vector.get_ref(i).as_binary().unwrap().unwrap(),
|
||||||
|
jsonbs.get(i).unwrap().as_slice()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// binary with jsonb header (0x80, 0x40, 0x20)
|
||||||
|
let binary_with_jsonb_header: Vec<u8> = [0x80, 0x23, 0x40, 0x22].to_vec();
|
||||||
|
let error = BinaryVector::from(vec![binary_with_jsonb_header])
|
||||||
|
.convert_binary_to_json()
|
||||||
|
.unwrap_err();
|
||||||
|
assert_matches!(error, error::Error::InvalidJson { .. });
|
||||||
|
|
||||||
|
// invalid json string
|
||||||
|
let json_strings = vec![b"{\"hello\": \"world\"".to_vec()];
|
||||||
|
let error = BinaryVector::from(json_strings)
|
||||||
|
.convert_binary_to_json()
|
||||||
|
.unwrap_err();
|
||||||
|
assert_matches!(error, error::Error::InvalidJson { .. });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ mod find_unique;
|
|||||||
mod replicate;
|
mod replicate;
|
||||||
mod take;
|
mod take;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use common_base::BitVec;
|
use common_base::BitVec;
|
||||||
|
|
||||||
use crate::error::{self, Result};
|
use crate::error::{self, Result};
|
||||||
@@ -89,6 +91,12 @@ macro_rules! impl_scalar_vector_op {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
|
fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
|
||||||
|
if to_type == &ConcreteDataType::json_datatype() {
|
||||||
|
if let Some(vector) = self.as_any().downcast_ref::<BinaryVector>() {
|
||||||
|
let json_vector = vector.convert_binary_to_json()?;
|
||||||
|
return Ok(Arc::new(json_vector) as VectorRef);
|
||||||
|
}
|
||||||
|
}
|
||||||
cast::cast_non_constant!(self, to_type)
|
cast::cast_non_constant!(self, to_type)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -961,7 +961,7 @@ pub(super) fn parameters_to_scalar_values(
|
|||||||
if let Some(server_type) = &server_type {
|
if let Some(server_type) = &server_type {
|
||||||
match server_type {
|
match server_type {
|
||||||
ConcreteDataType::Binary(_) => {
|
ConcreteDataType::Binary(_) => {
|
||||||
ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec()))
|
ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
return Err(invalid_parameter_error(
|
return Err(invalid_parameter_error(
|
||||||
@@ -971,7 +971,7 @@ pub(super) fn parameters_to_scalar_values(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec()))
|
ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => Err(invalid_parameter_error(
|
_ => Err(invalid_parameter_error(
|
||||||
|
|||||||
@@ -145,7 +145,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null)",
|
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null, j json default null)",
|
||||||
)
|
)
|
||||||
.execute(&pool)
|
.execute(&pool)
|
||||||
.await
|
.await
|
||||||
@@ -158,18 +158,30 @@ pub async fn test_mysql_crud(store_type: StorageType) {
|
|||||||
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
|
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
|
||||||
let hello = format!("hello{i}");
|
let hello = format!("hello{i}");
|
||||||
let bytes = hello.as_bytes();
|
let bytes = hello.as_bytes();
|
||||||
sqlx::query("insert into demo values(?, ?, ?, ?, ?)")
|
let jsons = serde_json::json!({
|
||||||
|
"code": i,
|
||||||
|
"success": true,
|
||||||
|
"payload": {
|
||||||
|
"features": [
|
||||||
|
"serde",
|
||||||
|
"json"
|
||||||
|
],
|
||||||
|
"homepage": null
|
||||||
|
}
|
||||||
|
});
|
||||||
|
sqlx::query("insert into demo values(?, ?, ?, ?, ?, ?)")
|
||||||
.bind(i)
|
.bind(i)
|
||||||
.bind(i)
|
.bind(i)
|
||||||
.bind(d)
|
.bind(d)
|
||||||
.bind(dt)
|
.bind(dt)
|
||||||
.bind(bytes)
|
.bind(bytes)
|
||||||
|
.bind(jsons)
|
||||||
.execute(&pool)
|
.execute(&pool)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let rows = sqlx::query("select i, d, dt, b from demo")
|
let rows = sqlx::query("select i, d, dt, b, j from demo")
|
||||||
.fetch_all(&pool)
|
.fetch_all(&pool)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -180,6 +192,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
|
|||||||
let d: NaiveDate = row.get("d");
|
let d: NaiveDate = row.get("d");
|
||||||
let dt: DateTime<Utc> = row.get("dt");
|
let dt: DateTime<Utc> = row.get("dt");
|
||||||
let bytes: Vec<u8> = row.get("b");
|
let bytes: Vec<u8> = row.get("b");
|
||||||
|
let json: serde_json::Value = row.get("j");
|
||||||
assert_eq!(ret, i as i64);
|
assert_eq!(ret, i as i64);
|
||||||
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
|
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
|
||||||
assert_eq!(expected_d, d);
|
assert_eq!(expected_d, d);
|
||||||
@@ -194,6 +207,18 @@ pub async fn test_mysql_crud(store_type: StorageType) {
|
|||||||
format!("{}", dt.format("%Y-%m-%d %H:%M:%S"))
|
format!("{}", dt.format("%Y-%m-%d %H:%M:%S"))
|
||||||
);
|
);
|
||||||
assert_eq!(format!("hello{i}"), String::from_utf8_lossy(&bytes));
|
assert_eq!(format!("hello{i}"), String::from_utf8_lossy(&bytes));
|
||||||
|
let expected_j = serde_json::json!({
|
||||||
|
"code": i,
|
||||||
|
"success": true,
|
||||||
|
"payload": {
|
||||||
|
"features": [
|
||||||
|
"serde",
|
||||||
|
"json"
|
||||||
|
],
|
||||||
|
"homepage": null
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert_eq!(json, expected_j);
|
||||||
}
|
}
|
||||||
|
|
||||||
let rows = sqlx::query("select i from demo where i=?")
|
let rows = sqlx::query("select i from demo where i=?")
|
||||||
@@ -396,7 +421,7 @@ pub async fn test_postgres_crud(store_type: StorageType) {
|
|||||||
let dt = d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis();
|
let dt = d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis();
|
||||||
let bytes = "hello".as_bytes();
|
let bytes = "hello".as_bytes();
|
||||||
let json = serde_json::json!({
|
let json = serde_json::json!({
|
||||||
"code": 200,
|
"code": i,
|
||||||
"success": true,
|
"success": true,
|
||||||
"payload": {
|
"payload": {
|
||||||
"features": [
|
"features": [
|
||||||
@@ -444,7 +469,7 @@ pub async fn test_postgres_crud(store_type: StorageType) {
|
|||||||
assert_eq!("hello".as_bytes(), bytes);
|
assert_eq!("hello".as_bytes(), bytes);
|
||||||
|
|
||||||
let expected_j = serde_json::json!({
|
let expected_j = serde_json::json!({
|
||||||
"code": 200,
|
"code": i,
|
||||||
"success": true,
|
"success": true,
|
||||||
"payload": {
|
"payload": {
|
||||||
"features": [
|
"features": [
|
||||||
|
|||||||
Reference in New Issue
Block a user