mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
feat: merge json datatype (#7142)
* feat: merge json datatype Signed-off-by: luofucong <luofc@foxmail.com> * resolve PR comments Signed-off-by: luofucong <luofc@foxmail.com> --------- Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
@@ -348,9 +348,9 @@ impl ConcreteDataType {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_json(&self) -> Option<JsonType> {
|
||||
pub fn as_json(&self) -> Option<&JsonType> {
|
||||
match self {
|
||||
ConcreteDataType::Json(j) => Some(j.clone()),
|
||||
ConcreteDataType::Json(j) => Some(j),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,6 +259,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to merge JSON datatype: {reason}"))]
|
||||
MergeJsonDatatype {
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -281,7 +288,8 @@ impl ErrorExt for Error {
|
||||
| InvalidJsonb { .. }
|
||||
| InvalidVector { .. }
|
||||
| InvalidFulltextOption { .. }
|
||||
| InvalidSkippingIndexOption { .. } => StatusCode::InvalidArguments,
|
||||
| InvalidSkippingIndexOption { .. }
|
||||
| MergeJsonDatatype { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
ValueExceedsPrecision { .. }
|
||||
| CastType { .. }
|
||||
|
||||
@@ -30,7 +30,7 @@ use snafu::{ResultExt, ensure};
|
||||
|
||||
use crate::data_type::{ConcreteDataType, DataType};
|
||||
use crate::error::{self, Error};
|
||||
use crate::types::{StructField, StructType};
|
||||
use crate::types::{ListType, StructField, StructType};
|
||||
use crate::value::{ListValue, StructValue, Value};
|
||||
|
||||
/// The configuration of JSON encoding
|
||||
@@ -375,8 +375,8 @@ fn encode_json_value_with_context<'a>(
|
||||
}
|
||||
Json::Array(arr) => {
|
||||
let list_value = encode_json_array_with_context(arr, expected_type, context)?;
|
||||
let data_type = list_value.datatype().clone();
|
||||
Ok((Value::List(list_value), (*data_type).clone()))
|
||||
let datatype = ConcreteDataType::List(ListType::new(list_value.datatype()));
|
||||
Ok((Value::List(list_value), datatype))
|
||||
}
|
||||
Json::Object(obj) => {
|
||||
let struct_value = encode_json_object_with_context(obj, None, context)?;
|
||||
|
||||
@@ -12,7 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use arrow_schema::Fields;
|
||||
@@ -21,10 +23,13 @@ use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::data_type::DataType;
|
||||
use crate::error::{DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, Result};
|
||||
use crate::error::{
|
||||
DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, MergeJsonDatatypeSnafu, Result,
|
||||
};
|
||||
use crate::prelude::ConcreteDataType;
|
||||
use crate::scalars::ScalarVectorBuilder;
|
||||
use crate::type_id::LogicalTypeId;
|
||||
use crate::types::{ListType, StructField, StructType};
|
||||
use crate::value::Value;
|
||||
use crate::vectors::{BinaryVectorBuilder, MutableVector};
|
||||
|
||||
@@ -48,11 +53,101 @@ impl JsonType {
|
||||
pub fn new(format: JsonFormat) -> Self {
|
||||
Self { format }
|
||||
}
|
||||
|
||||
// TODO(LFC): remove "allow unused"
|
||||
#[allow(unused)]
|
||||
/// Make json type a struct type, by:
|
||||
/// - if the json is an object, its entries are mapped to struct fields, obviously;
|
||||
/// - if not, the json is one of bool, number, string or array, make it a special field called
|
||||
/// "__plain" in a struct with only that field.
|
||||
pub(crate) fn as_struct_type(&self) -> StructType {
|
||||
match &self.format {
|
||||
JsonFormat::Jsonb => StructType::default(),
|
||||
JsonFormat::Native(inner) => match inner.as_ref() {
|
||||
ConcreteDataType::Struct(t) => t.clone(),
|
||||
x => StructType::new(Arc::new(vec![StructField::new(
|
||||
"__plain".to_string(),
|
||||
x.clone(),
|
||||
true,
|
||||
)])),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(LFC): remove "allow unused"
|
||||
#[allow(unused)]
|
||||
/// Try to merge this json type with others, error on datatype conflict.
|
||||
pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> {
|
||||
match (&self.format, &other.format) {
|
||||
(JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()),
|
||||
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
|
||||
let merged = merge(this.as_ref(), that.as_ref())?;
|
||||
self.format = JsonFormat::Native(Box::new(merged));
|
||||
Ok(())
|
||||
}
|
||||
_ => MergeJsonDatatypeSnafu {
|
||||
reason: "json format not match",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn merge(this: &ConcreteDataType, that: &ConcreteDataType) -> Result<ConcreteDataType> {
|
||||
match (this, that) {
|
||||
(this, that) if this == that => Ok(this.clone()),
|
||||
(ConcreteDataType::List(this), ConcreteDataType::List(that)) => {
|
||||
merge_list(this, that).map(ConcreteDataType::List)
|
||||
}
|
||||
(ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => {
|
||||
merge_struct(this, that).map(ConcreteDataType::Struct)
|
||||
}
|
||||
(ConcreteDataType::Null(_), x) | (x, ConcreteDataType::Null(_)) => Ok(x.clone()),
|
||||
_ => MergeJsonDatatypeSnafu {
|
||||
reason: format!("datatypes have conflict, this: {this}, that: {that}"),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_list(this: &ListType, that: &ListType) -> Result<ListType> {
|
||||
let merged = merge(this.item_type(), that.item_type())?;
|
||||
Ok(ListType::new(Arc::new(merged)))
|
||||
}
|
||||
|
||||
fn merge_struct(this: &StructType, that: &StructType) -> Result<StructType> {
|
||||
let this = Arc::unwrap_or_clone(this.fields());
|
||||
let that = Arc::unwrap_or_clone(that.fields());
|
||||
|
||||
let mut this: BTreeMap<String, StructField> = this
|
||||
.into_iter()
|
||||
.map(|x| (x.name().to_string(), x))
|
||||
.collect();
|
||||
// merge "that" into "this" directly:
|
||||
for that_field in that {
|
||||
let field_name = that_field.name().to_string();
|
||||
if let Some(this_field) = this.get(&field_name) {
|
||||
let merged_field = StructField::new(
|
||||
field_name.clone(),
|
||||
merge(this_field.data_type(), that_field.data_type())?,
|
||||
true, // the value in json object must be always nullable
|
||||
);
|
||||
this.insert(field_name, merged_field);
|
||||
} else {
|
||||
this.insert(field_name, that_field);
|
||||
}
|
||||
}
|
||||
|
||||
let fields = this.into_values().collect::<Vec<_>>();
|
||||
Ok(StructType::new(Arc::new(fields)))
|
||||
}
|
||||
|
||||
impl DataType for JsonType {
|
||||
fn name(&self) -> String {
|
||||
JSON_TYPE_NAME.to_string()
|
||||
match &self.format {
|
||||
JsonFormat::Jsonb => JSON_TYPE_NAME.to_string(),
|
||||
JsonFormat::Native(x) => format!("Json<{x}>"),
|
||||
}
|
||||
}
|
||||
|
||||
fn logical_type_id(&self) -> LogicalTypeId {
|
||||
@@ -106,3 +201,95 @@ pub fn parse_string_to_jsonb(s: &str) -> Result<Vec<u8>> {
|
||||
.map_err(|_| InvalidJsonSnafu { value: s }.build())
|
||||
.map(|json| json.to_vec())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::json::JsonStructureSettings;
|
||||
|
||||
#[test]
|
||||
fn test_merge_json_type() -> Result<()> {
|
||||
fn test(
|
||||
json: &str,
|
||||
json_type: &mut JsonType,
|
||||
expected: std::result::Result<&str, &str>,
|
||||
) -> Result<()> {
|
||||
let json: serde_json::Value = serde_json::from_str(json).unwrap();
|
||||
|
||||
let settings = JsonStructureSettings::Structured(None);
|
||||
let value = settings.encode(json)?;
|
||||
let value_type = value.data_type();
|
||||
let Some(other) = value_type.as_json() else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let result = json_type.merge(other);
|
||||
match (result, expected) {
|
||||
(Ok(()), Ok(expected)) => {
|
||||
assert_eq!(json_type.name(), expected)
|
||||
}
|
||||
(Err(err), Err(expected)) => {
|
||||
assert_eq!(err.to_string(), expected)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let json_type = &mut JsonType::new(JsonFormat::Native(Box::new(
|
||||
ConcreteDataType::null_datatype(),
|
||||
)));
|
||||
|
||||
// can merge with json object:
|
||||
let json = r#"{
|
||||
"hello": "world",
|
||||
"list": [1, 2, 3],
|
||||
"object": {"a": 1}
|
||||
}"#;
|
||||
let expected =
|
||||
r#"Json<Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
|
||||
test(json, json_type, Ok(expected))?;
|
||||
|
||||
// cannot merge with other non-object json values:
|
||||
let jsons = [r#""s""#, "1", "[1]"];
|
||||
let expects = [
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: String"#,
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: Int64"#,
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: List<Int64>"#,
|
||||
];
|
||||
for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
|
||||
test(json, json_type, Err(expect))?;
|
||||
}
|
||||
|
||||
// cannot merge with other json object with conflict field datatype:
|
||||
let json = r#"{
|
||||
"hello": 1,
|
||||
"float": 0.123,
|
||||
"no": 42
|
||||
}"#;
|
||||
let expected =
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Int64"#;
|
||||
test(json, json_type, Err(expected))?;
|
||||
|
||||
// can merge with another json object:
|
||||
let json = r#"{
|
||||
"hello": "greptime",
|
||||
"float": 0.123,
|
||||
"int": 42
|
||||
}"#;
|
||||
let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
|
||||
test(json, json_type, Ok(expected))?;
|
||||
|
||||
// can merge with some complex nested json object:
|
||||
let json = r#"{
|
||||
"list": [4],
|
||||
"object": {"foo": "bar", "l": ["x"], "o": {"key": "value"}},
|
||||
"float": 0.456,
|
||||
"int": 0
|
||||
}"#;
|
||||
let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64, "foo": String, "l": List<String>, "o": Struct<"key": String>>>>"#;
|
||||
test(json, json_type, Ok(expected))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ impl DataType for StructType {
|
||||
"Struct<{}>",
|
||||
self.fields
|
||||
.iter()
|
||||
.map(|f| f.name())
|
||||
.map(|f| format!(r#""{}": {}"#, f.name(), f.data_type()))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user