Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-26 16:20:28 +08:00
parent 62492fe9d7
commit a7bb2ac6b8
6 changed files with 172 additions and 39 deletions

View File

@@ -26,12 +26,12 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value as Json};
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, InvalidJsonSnafu, Result, SerializeSnafu};
use crate::json::value::{JsonValue, JsonVariant};
use crate::types::json_type::{JsonNativeType, JsonNumberType, JsonObjectType};
use crate::types::{StructField, StructType};
use crate::types::{JsonType, StructField, StructType};
use crate::value::{ListValue, StructValue, Value};
/// The configuration of JSON encoding
@@ -305,32 +305,45 @@ fn encode_json_array_with_context<'a>(
) -> Result<JsonValue> {
let json_array_len = json_array.len();
let mut items = Vec::with_capacity(json_array_len);
let mut element_type = item_type.cloned();
for (index, value) in json_array.into_iter().enumerate() {
let array_context = context.with_key(&index.to_string());
let item_value =
encode_json_value_with_context(value, element_type.as_ref(), &array_context)?;
let item_type = item_value.json_type().native_type().clone();
items.push(item_value.into_variant());
let item_value = encode_json_value_with_context(value, None, &array_context)?;
items.push(item_value);
}
// Determine the common type for the list
if let Some(current_type) = &element_type {
// It's valid for json array to have different types of items, for example,
// ["a string", 1]. However, the `JsonValue` will be converted to Arrow list array,
// which requires all items have exactly same type. So we forbid the different types
// case here. Besides, it's not common for items in a json array to differ. So I think
// we are good here.
ensure!(
item_type == *current_type,
error::InvalidJsonSnafu {
value: "all items in json array must have the same type"
}
);
} else {
element_type = Some(item_type);
// In specification, it's valid for a JSON array to have different types of items, for example,
// ["a string", 1]. However, in implementation, the `JsonValue` will be converted to Arrow list
// array, which requires all items have exactly the same type. So we merge out the maybe
// different item types to a unified type, and align all the item values to it.
let provided_item_type = item_type.map(|x| JsonType::new_json2(x.clone()));
let merged_item_type = if let Some((first, rests)) = items.split_first() {
let mut merged = first.json_type().clone();
for rest in rests.iter().map(|x| x.json_type()) {
merged.merge(rest)?;
}
Some(merged)
} else {
None
};
let unified_item_type = match (provided_item_type, merged_item_type) {
(Some(mut x), Some(y)) => {
x.merge(&y)?;
Some(x)
}
(Some(x), None) | (None, Some(x)) => Some(x),
(None, None) => None,
};
if let Some(unified_item_type) = unified_item_type {
for item in &mut items {
item.try_align(&unified_item_type)?;
}
}
let items = items
.into_iter()
.map(|x| x.into_variant())
.collect::<Vec<_>>();
Ok(JsonValue::new(JsonVariant::Array(items)))
}
@@ -729,7 +742,7 @@ where
#[cfg(test)]
mod tests {
use common_base::bytes::Bytes;
use serde_json::json;
use super::*;
@@ -1050,11 +1063,21 @@ mod tests {
fn test_encode_json_array_mixed_types() {
let json = json!([1, "hello", true, 3.15]);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None);
assert_eq!(
result.unwrap_err().to_string(),
"Invalid JSON: all items in json array must have the same type"
);
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
if let Value::List(list_value) = result {
assert_eq!(list_value.items().len(), 4);
assert_eq!(
list_value.datatype(),
Arc::new(ConcreteDataType::binary_datatype())
);
} else {
panic!("Expected List value");
}
}
#[test]
@@ -1276,12 +1299,12 @@ mod tests {
#[test]
fn test_encode_json_array_with_item_type() {
let json = json!([1, 2, 3]);
let item_type = Arc::new(ConcreteDataType::uint64_datatype());
let item_type = Arc::new(ConcreteDataType::int64_datatype());
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(
json,
Some(&JsonNativeType::Array(Box::new(JsonNativeType::u64()))),
Some(&JsonNativeType::Array(Box::new(JsonNativeType::i64()))),
)
.unwrap()
.into_json_inner()
@@ -1289,9 +1312,9 @@ mod tests {
if let Value::List(list_value) = result {
assert_eq!(list_value.items().len(), 3);
assert_eq!(list_value.items()[0], Value::UInt64(1));
assert_eq!(list_value.items()[1], Value::UInt64(2));
assert_eq!(list_value.items()[2], Value::UInt64(3));
assert_eq!(list_value.items()[0], Value::Int64(1));
assert_eq!(list_value.items()[1], Value::Int64(2));
assert_eq!(list_value.items()[2], Value::Int64(3));
assert_eq!(list_value.datatype(), item_type);
} else {
panic!("Expected List value");
@@ -2249,11 +2272,32 @@ mod tests {
)])),
);
let decoded_struct = settings.decode_struct(array_struct);
assert_eq!(
decoded_struct.unwrap_err().to_string(),
"Invalid JSON: all items in json array must have the same type"
);
let decoded_struct = settings.decode_struct(array_struct).unwrap();
let fields = decoded_struct.struct_type().fields();
let decoded_fields: Vec<&str> = fields.iter().map(|f| f.name()).collect();
assert!(decoded_fields.contains(&"value"));
if let Value::List(list_value) = &decoded_struct.items()[0] {
assert_eq!(list_value.items().len(), 4);
assert_eq!(
list_value.items()[0],
Value::Binary(Bytes::from("1".as_bytes()))
);
assert_eq!(
list_value.items()[1],
Value::Binary(Bytes::from(r#""hello""#.as_bytes()))
);
assert_eq!(
list_value.items()[2],
Value::Binary(Bytes::from("true".as_bytes()))
);
assert_eq!(
list_value.items()[3],
Value::Binary(Bytes::from("3.15".as_bytes()))
);
} else {
panic!("Expected array to be decoded as ListValue");
}
}
#[test]

View File

@@ -167,6 +167,18 @@ impl JsonVariant {
.map(|(k, v)| (k.clone(), v.native_type()))
.collect(),
),
// JsonVariant::Object(object) => {
// if object.is_empty() {
// JsonNativeType::Null
// } else {
// JsonNativeType::Object(
// object
// .iter()
// .map(|(k, v)| (k.clone(), v.native_type()))
// .collect(),
// )
// }
// }
JsonVariant::Variant(_) => JsonNativeType::Variant,
}
}
@@ -648,6 +660,18 @@ impl JsonVariantRef<'_> {
.map(|(k, v)| (k.to_string(), native_type(v)))
.collect(),
),
// JsonVariantRef::Object(object) => {
// if object.is_empty() {
// JsonNativeType::Null
// } else {
// JsonNativeType::Object(
// object
// .iter()
// .map(|(k, v)| (k.to_string(), native_type(v)))
// .collect(),
// )
// }
// }
JsonVariantRef::Variant(_) => JsonNativeType::Variant,
}
}

View File

@@ -115,6 +115,14 @@ impl JsonNativeType {
(JsonNativeType::Null, that) => that.clone(),
(this, JsonNativeType::Null) => this,
(this, that) if this == *that => this,
// (JsonNativeType::Number(x), JsonNativeType::Number(y)) => {
// JsonNativeType::Number(match (x, y) {
// (x, y) if x == y => *x,
// (JsonNumberType::F64, _) | (_, JsonNumberType::F64) => JsonNumberType::F64,
// _ => JsonNumberType::I64,
// })
// }
_ => JsonNativeType::Variant,
};
}

View File

@@ -26,12 +26,14 @@ use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaR
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow_array::BinaryArray;
use datatypes::extension::json::is_json_extension_type;
use datatypes::timestamp::timestamp_array_to_primitive;
use datatypes::vectors::json::array::JsonArray;
use futures::{Stream, TryStreamExt};
use snafu::ResultExt;
use store_api::storage::SequenceNumber;
use crate::error::{ComputeArrowSnafu, Result};
use crate::error::{ComputeArrowSnafu, DataTypeMismatchSnafu, Result};
use crate::memtable::BoxedRecordBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::BoxedRecordBatchStream;
@@ -258,6 +260,31 @@ impl BatchBuilder {
check_interleave_overflow(&self.batches, &self.schema, &self.indices)?;
// let columns = self
// .schema
// .fields()
// .iter()
// .enumerate()
// .map(|(column_idx, field)| {
// let arrays = self
// .batches
// .iter()
// .map(|(_, batch)| {
// let column = batch.column(column_idx);
// let column = if is_json_extension_type(field) {
// JsonArray::from(column)
// .try_align(field.data_type())
// .context(DataTypeMismatchSnafu)?
// } else {
// column.clone()
// };
// Ok(column)
// })
// .collect::<Result<Vec<_>>>()?;
// let aligned = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
// interleave(&aligned, &self.indices).context(ComputeArrowSnafu)
// })
// .collect::<Result<Vec<_>>>()?;
let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self

View File

@@ -151,6 +151,14 @@ select j.c, j.y from json2_table order by ts;
| | false |
+-----------------------------------+-----------------------------------+
select j from json2_table order by ts;
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 0
select * from json2_table order by ts;
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 1
select j.a.b + 1 from json2_table order by ts;
+------------------------------------------------------------+
@@ -168,6 +176,19 @@ select j.a.b + 1 from json2_table order by ts;
| 11 |
+------------------------------------------------------------+
select abs(j.a.b) from json2_table order by ts;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts.
Candidate functions:
abs(Numeric(1))
-- "j.c" is of type "String", "abs" is expected to be all "null"s.
select abs(j.c) from json2_table order by ts;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts.
Candidate functions:
abs(Numeric(1))
select j.d from json2_table order by ts;
+-----------------------------------+

View File

@@ -46,8 +46,17 @@ select j.a, j.a.x from json2_table order by ts;
select j.c, j.y from json2_table order by ts;
select j from json2_table order by ts;
select * from json2_table order by ts;
select j.a.b + 1 from json2_table order by ts;
select abs(j.a.b) from json2_table order by ts;
-- "j.c" is of type "String", "abs" is expected to be all "null"s.
select abs(j.c) from json2_table order by ts;
select j.d from json2_table order by ts;
drop table json2_table;