feat: json2 insert

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-04-17 14:00:31 +08:00
parent fb3290e555
commit cf2c5155cd
19 changed files with 490 additions and 454 deletions

2
Cargo.lock generated
View File

@@ -5683,7 +5683,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=26a50f4069f50c37d65b45e0d39ae0cb42de5425#26a50f4069f50c37d65b45e0d39ae0cb42de5425"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0ee5a58ce2ea54de032faf13207ad089057ff288#0ee5a58ce2ea54de032faf13207ad089057ff288"
dependencies = [
"prost 0.14.1",
"prost-types 0.14.1",

View File

@@ -154,7 +154,7 @@ etcd-client = { version = "0.17", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "26a50f4069f50c37d65b45e0d39ae0cb42de5425" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0ee5a58ce2ea54de032faf13207ad089057ff288" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -919,6 +919,7 @@ pub fn encode_json_value(value: JsonValue) -> v1::JsonValue {
.collect::<Vec<_>>();
Some(json_value::Value::Object(JsonObject { entries }))
}
JsonVariant::Variant(x) => Some(json_value::Value::Variant(x)),
};
v1::JsonValue { value }
}
@@ -952,6 +953,7 @@ fn decode_json_value(value: &v1::JsonValue) -> JsonValueRef<'_> {
})
.collect::<BTreeMap<_, _>>()
.into(),
json_value::Value::Variant(x) => x.as_slice().into(),
}
}

View File

@@ -276,10 +276,6 @@ impl ConcreteDataType {
matches!(self, ConcreteDataType::Null(NullType))
}
pub(crate) fn is_struct(&self) -> bool {
matches!(self, ConcreteDataType::Struct(_))
}
/// Try to cast the type as a [`ListType`].
pub fn as_list(&self) -> Option<&ListType> {
match self {

View File

@@ -274,6 +274,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to align JSON value, reason: {}", reason))]
AlignJsonValue {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -316,7 +323,8 @@ impl ErrorExt for Error {
| ConvertScalarToArrowArray { .. }
| ParseExtendedType { .. }
| InconsistentStructFieldsAndItems { .. }
| ArrowMetadata { .. } => StatusCode::Internal,
| ArrowMetadata { .. }
| AlignJsonValue { .. } => StatusCode::Internal,
}
}

View File

@@ -21,9 +21,12 @@ use num_traits::ToPrimitive;
use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use serde_json::Number;
use snafu::{OptionExt, ResultExt, ensure};
use crate::Result;
use crate::data_type::ConcreteDataType;
use crate::types::json_type::JsonNativeType;
use crate::error::{AlignJsonValueSnafu, SerializeSnafu};
use crate::types::json_type::{JsonNativeType, JsonNumberType};
use crate::types::{JsonType, StructField, StructType};
use crate::value::{ListValue, ListValueRef, StructValue, StructValueRef, Value, ValueRef};
@@ -105,19 +108,17 @@ impl Display for JsonNumber {
}
/// Variants of json.
///
/// This follows how [serde_json::Value] designs except that we only choose to use [BTreeMap] to
/// preserve the fields order by their names in the json object. (By default `serde_json` uses
/// [BTreeMap], too. But it additionally supports "IndexMap" which preserves the order by insertion
/// times of fields.)
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub enum JsonVariant {
#[default]
Null,
Bool(bool),
Number(JsonNumber),
String(String),
Array(Vec<JsonVariant>),
Object(BTreeMap<String, JsonVariant>),
/// A special "variant" value of JSON, to represent a union result of conflict JSON type values.
Variant(Vec<u8>),
}
impl JsonVariant {
@@ -166,6 +167,7 @@ impl JsonVariant {
.map(|(k, v)| (k.clone(), v.native_type()))
.collect(),
),
JsonVariant::Variant(_) => JsonNativeType::Variant,
}
}
@@ -192,6 +194,7 @@ impl JsonVariant {
.map(|(k, v)| (k.as_str(), v.as_ref()))
.collect(),
),
JsonVariant::Variant(v) => JsonVariantRef::Variant(v),
}
}
}
@@ -291,6 +294,13 @@ impl Display for JsonVariant {
.join(", ")
)
}
Self::Variant(x) => {
let result: serde_json::Result<serde_json::Value> = serde_json::from_slice(x);
match result {
Ok(v) => write!(f, "{v}"),
Err(_) => write!(f, "{x:?}"),
}
}
}
}
}
@@ -403,10 +413,97 @@ impl JsonValue {
}
Value::Struct(StructValue::new(items, StructType::new(Arc::new(fields))))
}
JsonVariant::Variant(x) => Value::Binary(x.into()),
}
}
helper(self.json_variant)
}
pub(crate) fn try_align(&mut self, expected: &JsonType) -> Result<()> {
if self.json_type() == expected {
return Ok(());
}
fn helper(value: JsonVariant, expected: &JsonNativeType) -> Result<JsonVariant> {
Ok(match (value, expected) {
(JsonVariant::Null, _) | (_, JsonNativeType::Null) => JsonVariant::Null,
(JsonVariant::Bool(v), JsonNativeType::Bool) => JsonVariant::Bool(v),
(JsonVariant::Number(v), JsonNativeType::Number(n)) => {
return match n {
JsonNumberType::U64 => v
.as_u64()
.map(|x| JsonVariant::Number(JsonNumber::PosInt(x))),
JsonNumberType::I64 => v
.as_i64()
.map(|x| JsonVariant::Number(JsonNumber::NegInt(x))),
JsonNumberType::F64 => {
Some(JsonVariant::Number(JsonNumber::Float(v.as_f64().into())))
}
}
.with_context(|| AlignJsonValueSnafu {
reason: format!("unable to align number {} to type {}", v, expected),
});
}
(JsonVariant::String(v), JsonNativeType::String) => JsonVariant::String(v),
(JsonVariant::Array(items), JsonNativeType::Array(expected)) => JsonVariant::Array(
items
.into_iter()
.map(|item| helper(item, expected.as_ref()))
.collect::<Result<_>>()?,
),
(JsonVariant::Object(mut kvs), JsonNativeType::Object(expected)) => {
ensure!(
expected.keys().len() >= kvs.keys().len()
&& kvs.keys().all(|k| expected.contains_key(k)),
AlignJsonValueSnafu {
reason: format!(
"aligned type '{}' should be superset of value '{}'",
JsonNativeType::Object(expected.clone()),
JsonVariant::from(kvs),
)
}
);
let mut object = BTreeMap::new();
for (field, field_type) in expected {
if let Some((k, v)) = kvs.remove_entry(field) {
object.insert(k, helper(v, field_type)?);
} else {
object.insert(field.clone(), JsonVariant::Null);
}
}
JsonVariant::Object(object)
}
(v, JsonNativeType::Variant) => {
let json: serde_json::Value =
JsonValue::new(v).try_into().context(SerializeSnafu)?;
serde_json::to_vec(&json)
.map(JsonVariant::Variant)
.context(SerializeSnafu)?
}
(value, expected) => {
return AlignJsonValueSnafu {
reason: format!(
"unable to align '{}' of type {} to type {}",
value,
value.native_type(),
expected,
),
}
.fail();
}
})
}
let x = std::mem::take(&mut self.json_variant);
self.json_variant = helper(x, expected.native_type())?;
self.json_type = OnceLock::from(expected.clone());
Ok(())
}
}
impl<T: Into<JsonVariant>> From<T> for JsonValue {
@@ -418,10 +515,12 @@ impl<T: Into<JsonVariant>> From<T> for JsonValue {
}
}
impl From<JsonValue> for serde_json::Value {
fn from(v: JsonValue) -> Self {
fn helper(v: JsonVariant) -> serde_json::Value {
match v {
impl TryFrom<JsonValue> for serde_json::Value {
type Error = serde_json::Error;
fn try_from(v: JsonValue) -> serde_json::Result<Self> {
fn helper(v: JsonVariant) -> serde_json::Result<serde_json::Value> {
Ok(match v {
JsonVariant::Null => serde_json::Value::Null,
JsonVariant::Bool(x) => serde_json::Value::Bool(x),
JsonVariant::Number(x) => match x {
@@ -436,13 +535,21 @@ impl From<JsonValue> for serde_json::Value {
}
},
JsonVariant::String(x) => serde_json::Value::String(x),
JsonVariant::Array(array) => {
serde_json::Value::Array(array.into_iter().map(helper).collect())
}
JsonVariant::Object(object) => serde_json::Value::Object(
object.into_iter().map(|(k, v)| (k, helper(v))).collect(),
JsonVariant::Array(array) => serde_json::Value::Array(
array
.into_iter()
.map(helper)
.collect::<serde_json::Result<Vec<_>>>()?,
),
}
JsonVariant::Object(object) => {
let mut map = serde_json::Map::with_capacity(object.len());
for (k, v) in object {
map.insert(k, helper(v)?);
}
serde_json::Value::Object(map)
}
JsonVariant::Variant(x) => serde_json::from_slice(&x)?,
})
}
helper(v.json_variant)
}
@@ -496,6 +603,7 @@ pub enum JsonVariantRef<'a> {
String(&'a str),
Array(Vec<JsonVariantRef<'a>>),
Object(BTreeMap<&'a str, JsonVariantRef<'a>>),
Variant(&'a [u8]),
}
impl JsonVariantRef<'_> {
@@ -524,6 +632,7 @@ impl JsonVariantRef<'_> {
.map(|(k, v)| (k.to_string(), native_type(v)))
.collect(),
),
JsonVariantRef::Variant(_) => JsonNativeType::Variant,
}
}
JsonType::new_json2(native_type(self))
@@ -596,10 +705,17 @@ impl From<JsonVariantRef<'_>> for JsonVariant {
.map(|(k, v)| (k.to_string(), v.into()))
.collect(),
),
JsonVariantRef::Variant(x) => Self::Variant(x.to_vec()),
}
}
}
impl<'a> From<&'a [u8]> for JsonVariantRef<'a> {
fn from(value: &'a [u8]) -> Self {
Self::Variant(value)
}
}
/// Reference to representation of any valid JSON value.
#[derive(Debug, Serialize)]
pub struct JsonValueRef<'a> {
@@ -647,7 +763,7 @@ impl<'a> JsonValueRef<'a> {
}
}
pub fn as_value_ref(&self) -> ValueRef<'_> {
fn as_value_ref(&self) -> ValueRef<'_> {
fn helper<'a>(v: &'a JsonVariantRef) -> ValueRef<'a> {
match v {
JsonVariantRef::Null => ValueRef::Null,
@@ -683,6 +799,7 @@ impl<'a> JsonValueRef<'a> {
fields: StructType::new(Arc::new(fields)),
})
}
JsonVariantRef::Variant(x) => ValueRef::Binary(x),
}
}
helper(&self.json_variant)
@@ -691,6 +808,21 @@ impl<'a> JsonValueRef<'a> {
pub(crate) fn data_size(&self) -> usize {
size_of_val(self)
}
pub(crate) fn variant(&self) -> &JsonVariantRef<'a> {
&self.json_variant
}
pub(crate) fn as_struct_value(&self) -> ValueRef<'_> {
if self.is_object() {
return self.as_value_ref();
}
ValueRef::Struct(StructValueRef::RefList {
val: vec![self.as_value_ref()],
fields: self.json_type().as_struct_type(),
})
}
}
impl<'a, T: Into<JsonVariantRef<'a>>> From<T> for JsonValueRef<'a> {
@@ -735,3 +867,100 @@ impl Clone for JsonValueRef<'_> {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::json_type::JsonObjectType;
#[test]
fn test_align_json_value() -> Result<()> {
fn parse_json_value(json: &str) -> JsonValue {
let value: serde_json::Value = serde_json::from_str(json).unwrap();
value.into()
}
// Root type can be aligned to Null, and the cached json_type must be refreshed.
let mut value = JsonValue::from(true);
assert_eq!(
value.json_type(),
&JsonType::new_json2(JsonNativeType::Bool)
);
value.try_align(&JsonType::null())?;
assert_eq!(value, JsonValue::null());
assert_eq!(value.json_type(), &JsonType::null());
// Object alignment now requires the expected type to be a superset of the
// value fields, while still filling missing expected fields with null.
let expected = JsonType::new_json2(JsonNativeType::Object(JsonObjectType::from([
("extra".to_string(), JsonNativeType::u64()),
(
"items".to_string(),
JsonNativeType::Array(Box::new(JsonNativeType::Object(JsonObjectType::from([
("id".to_string(), JsonNativeType::u64()),
("payload".to_string(), JsonNativeType::Variant),
("note".to_string(), JsonNativeType::String),
])))),
),
("name".to_string(), JsonNativeType::String),
])));
let mut value = parse_json_value(r#"{"items":[{"id":1,"payload":{"k":"v"}}],"extra":1}"#);
assert_ne!(value.json_type(), &expected);
value.try_align(&expected)?;
assert_eq!(
value,
JsonValue::from(JsonVariant::Object(BTreeMap::from([
("extra".to_string(), JsonVariant::from(1_u64)),
(
"items".to_string(),
JsonVariant::Array(vec![JsonVariant::Object(BTreeMap::from([
("id".to_string(), JsonVariant::from(1_u64)),
("note".to_string(), JsonVariant::Null),
(
"payload".to_string(),
JsonVariant::Variant(br#"{"k":"v"}"#.to_vec()),
),
]))]),
),
("name".to_string(), JsonVariant::Null),
])))
);
assert_eq!(value.json_type(), &expected);
// Object alignment should fail if the expected type misses any field from the value.
let expected = JsonType::new_json2(JsonNativeType::Object(JsonObjectType::from([(
"items".to_string(),
JsonNativeType::Array(Box::new(JsonNativeType::Object(JsonObjectType::from([
("id".to_string(), JsonNativeType::u64()),
("payload".to_string(), JsonNativeType::Variant),
])))),
)])));
let mut value =
parse_json_value(r#"{"items":[{"id":1,"payload":{"k":"v"},"extra":true}]}"#);
let err = value.try_align(&expected).unwrap_err();
assert_eq!(
err.to_string(),
r#"Failed to align JSON value, reason: aligned type '{"id":"<Number>","payload":"<Variant>"}' should be superset of value '{ extra: true, id: 1, payload: { k: v } }'"#
);
// Root-level Variant alignment should preserve the original JSON payload.
let mut value = parse_json_value(r#"{"foo":[1,true,null]}"#);
value.try_align(&JsonType::new_json2(JsonNativeType::Variant))?;
assert_eq!(
value,
JsonValue::from(JsonVariant::Variant(br#"{"foo":[1,true,null]}"#.to_vec()))
);
// Incompatible scalar alignment should fail instead of coercing the value.
let mut value = JsonValue::from("hello");
let err = value
.try_align(&JsonType::new_json2(JsonNativeType::Bool))
.unwrap_err();
assert_eq!(
err.to_string(),
r#"Failed to align JSON value, reason: unable to align 'hello' of type "<String>" to type "<Bool>""#
);
Ok(())
}
}

View File

@@ -18,7 +18,6 @@ use std::str::FromStr;
use std::sync::{Arc, LazyLock};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_schema::Fields;
use common_base::bytes::Bytes;
use regex::{Captures, Regex};
use serde::{Deserialize, Serialize};
@@ -240,17 +239,6 @@ impl JsonType {
}
}
/// Check if it can merge with `other` json type.
pub(crate) fn is_mergeable(&self, other: &JsonType) -> bool {
match (&self.format, &other.format) {
(JsonFormat::Jsonb, JsonFormat::Jsonb) => true,
(JsonFormat::Json2(this), JsonFormat::Json2(that)) => {
is_mergeable(this.as_ref(), that.as_ref())
}
_ => false,
}
}
/// Check if it includes all fields in `other` json type.
pub fn is_include(&self, other: &JsonType) -> bool {
match (&self.format, &other.format) {
@@ -297,31 +285,6 @@ pub(crate) fn plain_json_struct_type(item_type: ConcreteDataType) -> StructType
StructType::new(Arc::new(vec![field]))
}
fn is_mergeable(this: &JsonNativeType, that: &JsonNativeType) -> bool {
fn is_mergeable_object(this: &JsonObjectType, that: &JsonObjectType) -> bool {
for (type_name, that_type) in that {
if let Some(this_type) = this.get(type_name)
&& !is_mergeable(this_type, that_type)
{
return false;
}
}
true
}
match (this, that) {
(this, that) if this == that => true,
(JsonNativeType::Array(this), JsonNativeType::Array(that)) => {
is_mergeable(this.as_ref(), that.as_ref())
}
(JsonNativeType::Object(this), JsonNativeType::Object(that)) => {
is_mergeable_object(this, that)
}
(JsonNativeType::Null, _) | (_, JsonNativeType::Null) => true,
_ => false,
}
}
fn merge(this: &JsonNativeType, that: &JsonNativeType) -> JsonNativeType {
fn merge_object(this: &JsonObjectType, that: &JsonObjectType) -> JsonObjectType {
let mut this = this.clone();
@@ -354,7 +317,14 @@ impl DataType for JsonType {
fn name(&self) -> String {
match &self.format {
JsonFormat::Jsonb => JSON_TYPE_NAME.to_string(),
JsonFormat::Json2(_) => JSON2_TYPE_NAME.to_string(),
JsonFormat::Json2(x) => format!(
"{JSON2_TYPE_NAME}{}",
if x.is_null() {
"".to_string()
} else {
x.to_string()
}
),
}
}
@@ -369,12 +339,7 @@ impl DataType for JsonType {
fn as_arrow_type(&self) -> ArrowDataType {
match self.format {
JsonFormat::Jsonb => ArrowDataType::Binary,
// "Erase" the JSON struct when converting to Arrow datatype, is a feature (not a bug).
// The actual Arrow datatype is deduced from parquet data and query schema (a process
// called "JSON type concretization") from time to time, there's no a universal/global
// type for JSON2.
// Same reason for ignoring the struct in the `name` method above.
JsonFormat::Json2(_) => ArrowDataType::Struct(Fields::empty()),
JsonFormat::Json2(_) => self.as_struct_type().as_arrow_type(),
}
}

View File

@@ -932,7 +932,7 @@ impl TryFrom<Value> for serde_json::Value {
.collect::<serde_json::Result<Map<String, serde_json::Value>>>()?;
serde_json::Value::Object(map)
}
Value::Json(v) => (*v).into(),
Value::Json(v) => (*v).try_into()?,
};
Ok(json_value)

View File

@@ -13,191 +13,46 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::sync::LazyLock;
use crate::data_type::ConcreteDataType;
use crate::error::{Result, TryFromValueSnafu, UnsupportedOperationSnafu};
use crate::json::value::JsonValueRef;
use crate::json::value::{JsonValue, JsonVariant};
use crate::prelude::{ValueRef, Vector, VectorRef};
use crate::types::JsonType;
use crate::types::json_type::JsonNativeType;
use crate::types::{JsonType, json_type};
use crate::value::StructValueRef;
use crate::vectors::{MutableVector, StructVectorBuilder};
struct JsonStructsBuilder {
json_type: JsonType,
inner: StructVectorBuilder,
}
impl JsonStructsBuilder {
fn new(json_type: JsonType, capacity: usize) -> Self {
let struct_type = json_type.as_struct_type();
let inner = StructVectorBuilder::with_type_and_capacity(struct_type, capacity);
Self { json_type, inner }
}
fn len(&self) -> usize {
self.inner.len()
}
fn push(&mut self, json: &JsonValueRef) -> Result<()> {
let mut value = json.as_value_ref();
if !json.is_object() {
let fields = json_type::plain_json_struct_type(value.data_type());
value = ValueRef::Struct(StructValueRef::RefList {
val: vec![value],
fields,
})
}
self.inner.try_push_value_ref(&value)
}
/// Try to merge (and consume the data of) other json vector builder into this one.
/// Note that the other builder's json type must be able to be merged with this one's
/// (this one's json type has all the fields in other one's, and no datatypes conflict).
/// Normally this is guaranteed, as long as json values are pushed through [JsonVectorBuilder].
fn try_merge(&mut self, other: &mut JsonStructsBuilder) -> Result<()> {
debug_assert!(self.json_type.is_mergeable(&other.json_type));
fn helper(this: &mut StructVectorBuilder, that: &mut StructVectorBuilder) -> Result<()> {
let that_len = that.len();
if let Some(x) = that.mut_null_buffer().finish() {
this.mut_null_buffer().append_buffer(&x)
} else {
this.mut_null_buffer().append_n_non_nulls(that_len);
}
let that_fields = that.struct_type().fields();
let mut that_builders = that_fields
.iter()
.zip(that.mut_value_builders().iter_mut())
.map(|(field, builder)| (field.name(), builder))
.collect::<HashMap<_, _>>();
for (field, this_builder) in this
.struct_type()
.fields()
.iter()
.zip(this.mut_value_builders().iter_mut())
{
if let Some(that_builder) = that_builders.get_mut(field.name()) {
if field.data_type().is_struct() {
let this = this_builder
.as_mut_any()
.downcast_mut::<StructVectorBuilder>()
// Safety: a struct datatype field must be corresponding to a struct vector builder.
.unwrap();
let that = that_builder
.as_mut_any()
.downcast_mut::<StructVectorBuilder>()
// Safety: other builder with same field name must have same datatype,
// ensured because the two json types are mergeable.
.unwrap();
helper(this, that)?;
} else {
let vector = that_builder.to_vector();
this_builder.extend_slice_of(vector.as_ref(), 0, vector.len())?;
}
} else {
this_builder.push_nulls(that_len);
}
}
Ok(())
}
helper(&mut self.inner, &mut other.inner)
}
/// Same as [JsonStructsBuilder::try_merge], but does not consume the other builder's data.
fn try_merge_cloned(&mut self, other: &JsonStructsBuilder) -> Result<()> {
debug_assert!(self.json_type.is_mergeable(&other.json_type));
fn helper(this: &mut StructVectorBuilder, that: &StructVectorBuilder) -> Result<()> {
let that_len = that.len();
if let Some(x) = that.null_buffer().finish_cloned() {
this.mut_null_buffer().append_buffer(&x)
} else {
this.mut_null_buffer().append_n_non_nulls(that_len);
}
let that_fields = that.struct_type().fields();
let that_builders = that_fields
.iter()
.zip(that.value_builders().iter())
.map(|(field, builder)| (field.name(), builder))
.collect::<HashMap<_, _>>();
for (field, this_builder) in this
.struct_type()
.fields()
.iter()
.zip(this.mut_value_builders().iter_mut())
{
if let Some(that_builder) = that_builders.get(field.name()) {
if field.data_type().is_struct() {
let this = this_builder
.as_mut_any()
.downcast_mut::<StructVectorBuilder>()
// Safety: a struct datatype field must be corresponding to a struct vector builder.
.unwrap();
let that = that_builder
.as_any()
.downcast_ref::<StructVectorBuilder>()
// Safety: other builder with same field name must have same datatype,
// ensured because the two json types are mergeable.
.unwrap();
helper(this, that)?;
} else {
let vector = that_builder.to_vector_cloned();
this_builder.extend_slice_of(vector.as_ref(), 0, vector.len())?;
}
} else {
this_builder.push_nulls(that_len);
}
}
Ok(())
}
helper(&mut self.inner, &other.inner)
}
}
/// The vector builder for json type values.
///
/// Json type are dynamic, to some degree (as long as they can be merged into each other). So are
/// json values. Json values are physically stored in struct vectors, which require the types of
/// struct values to be fixed inside a certain struct vector. So to resolve "dynamic" vs "fixed"
/// datatype problem, in this builder, each type of json value gets its own struct vector builder.
/// Once new json type value is pushing into this builder, it creates a new "child" builder for it.
///
/// Given the "mixed" nature of the values stored in this builder, to produce the json vector, a
/// "merge" operation is performed. The "merge" is to iterate over all the "child" builders, and fill
/// nulls for missing json fields. The final vector's json type is fixed to be the "merge" of all
/// pushed json types.
#[derive(Clone)]
pub(crate) struct JsonVectorBuilder {
merged_type: JsonType,
capacity: usize,
builders: Vec<JsonStructsBuilder>,
values: Vec<JsonValue>,
}
impl JsonVectorBuilder {
pub(crate) fn new(json_type: JsonNativeType, capacity: usize) -> Self {
Self {
merged_type: JsonType::new_json2(json_type),
capacity,
builders: vec![],
values: Vec::with_capacity(capacity),
}
}
fn try_create_new_builder(&mut self, json_type: &JsonType) -> Result<&mut JsonStructsBuilder> {
self.merged_type.merge(json_type)?;
fn try_build(&mut self) -> Result<VectorRef> {
let mut builder = StructVectorBuilder::with_type_and_capacity(
self.merged_type.as_struct_type(),
self.values.len(),
);
for value in self.values.iter_mut() {
value.try_align(&self.merged_type)?;
let builder = JsonStructsBuilder::new(json_type.clone(), self.capacity);
self.builders.push(builder);
if value.is_null() {
builder.push_null();
continue;
}
let len = self.builders.len();
Ok(&mut self.builders[len - 1])
let value = value.as_ref();
builder.try_push_value_ref(&value.as_struct_value())?;
}
Ok(builder.to_vector())
}
}
@@ -207,7 +62,7 @@ impl MutableVector for JsonVectorBuilder {
}
fn len(&self) -> usize {
self.builders.iter().map(|x| x.len()).sum()
self.values.len()
}
fn as_any(&self) -> &dyn Any {
@@ -219,37 +74,11 @@ impl MutableVector for JsonVectorBuilder {
}
fn to_vector(&mut self) -> VectorRef {
// Fast path:
if self.builders.len() == 1 {
return self.builders[0].inner.to_vector();
}
let mut unified_jsons = JsonStructsBuilder::new(self.merged_type.clone(), self.capacity);
for builder in self.builders.iter_mut() {
unified_jsons
.try_merge(builder)
// Safety: the "unified_jsons" has the merged json type from all the builders,
// so it should merge them without errors.
.unwrap_or_else(|e| panic!("failed to merge json builders, error: {e}"));
}
unified_jsons.inner.to_vector()
self.try_build().unwrap_or_else(|e| panic!("{}", e))
}
fn to_vector_cloned(&self) -> VectorRef {
// Fast path:
if self.builders.len() == 1 {
return self.builders[0].inner.to_vector_cloned();
}
let mut unified_jsons = JsonStructsBuilder::new(self.merged_type.clone(), self.capacity);
for builder in self.builders.iter() {
unified_jsons
.try_merge_cloned(builder)
// Safety: the "unified_jsons" has the merged json type from all the builders,
// so it should merge them without errors.
.unwrap_or_else(|e| panic!("failed to merge json builders, error: {e}"));
}
unified_jsons.inner.to_vector_cloned()
self.clone().to_vector()
}
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
@@ -260,29 +89,15 @@ impl MutableVector for JsonVectorBuilder {
.fail();
};
let json_type = value.json_type();
self.merged_type.merge(json_type)?;
let builder = match self.builders.last_mut() {
Some(last) => {
// TODO(LFC): use "is_include" and amend json value with nulls
if &last.json_type != json_type {
self.try_create_new_builder(json_type)?
} else {
last
}
}
None => self.try_create_new_builder(json_type)?,
};
builder.push(value.as_ref())
let value = JsonValue::new(JsonVariant::from(value.variant().clone()));
self.values.push(value);
Ok(())
}
fn push_null(&mut self) {
static NULL_JSON: LazyLock<ValueRef> =
LazyLock::new(|| ValueRef::Json(Box::new(JsonValueRef::null())));
self.try_push_value_ref(&NULL_JSON)
// Safety: learning from the method "try_push_value_ref", a null json value should be
// always able to push into any json vectors.
.unwrap_or_else(|e| panic!("failed to push null json value, error: {e}"));
self.values.push(JsonValue::null())
}
fn extend_slice_of(&mut self, _: &dyn Vector, _: usize, _: usize) -> Result<()> {
@@ -293,3 +108,107 @@ impl MutableVector for JsonVectorBuilder {
.fail()
}
}
#[cfg(test)]
mod tests {
use common_base::bytes::Bytes;
use super::*;
use crate::data_type::ConcreteDataType;
use crate::types::json_type::JsonObjectType;
use crate::value::{StructValue, Value, ValueRef};
#[test]
fn test_json_vector_builder() -> Result<()> {
fn parse_json_value(json: &str) -> Value {
let value: serde_json::Value = serde_json::from_str(json).unwrap();
Value::Json(Box::new(value.into()))
}
// Object inputs should merge into a superset schema, preserve null rows,
// and align conflicting nested values into Variant payloads.
let mut builder = JsonVectorBuilder::new(JsonNativeType::Object(Default::default()), 3);
let first = parse_json_value(r#"{"id":1,"payload":{"name":"foo"}}"#);
let second = parse_json_value(r#"{"id":2,"extra":true,"payload":"raw"}"#);
builder.try_push_value_ref(&first.as_value_ref())?;
builder.push_null();
builder.try_push_value_ref(&second.as_value_ref())?;
let merged_type = JsonType::new_json2(JsonNativeType::Object(JsonObjectType::from([
("extra".to_string(), JsonNativeType::Bool),
("id".to_string(), JsonNativeType::i64()),
("payload".to_string(), JsonNativeType::Variant),
])));
assert_eq!(
builder.data_type(),
ConcreteDataType::Json(merged_type.clone())
);
let merged_struct_type = merged_type.as_struct_type();
let vector = builder.to_vector();
assert_eq!(vector.len(), 3);
assert_eq!(
vector.get(0),
Value::Struct(StructValue::new(
vec![
Value::Null,
Value::Int64(1),
Value::Binary(Bytes::from(br#"{"name":"foo"}"#.to_vec())),
],
merged_struct_type.clone(),
))
);
assert_eq!(vector.get(1), Value::Null);
assert_eq!(
vector.get(2),
Value::Struct(StructValue::new(
vec![
Value::Boolean(true),
Value::Int64(2),
Value::Binary(Bytes::from(br#""raw""#.to_vec())),
],
merged_struct_type,
))
);
// Root-level conflicts should be lifted to a plain Variant field that preserves
// each original JSON payload.
let mut variant_builder = JsonVectorBuilder::new(JsonNativeType::Bool, 2);
let object = parse_json_value(r#"{"k":1}"#);
let boolean = parse_json_value("true");
variant_builder.try_push_value_ref(&boolean.as_value_ref())?;
variant_builder.try_push_value_ref(&object.as_value_ref())?;
let variant_type = JsonType::new_json2(JsonNativeType::Variant);
assert_eq!(
variant_builder.data_type(),
ConcreteDataType::Json(variant_type.clone())
);
let variant_struct_type = variant_type.as_struct_type();
let vector = variant_builder.to_vector();
assert_eq!(
vector.get(0),
Value::Struct(StructValue::new(
vec![Value::Binary(Bytes::from(b"true".to_vec()))],
variant_struct_type.clone(),
))
);
assert_eq!(
vector.get(1),
Value::Struct(StructValue::new(
vec![Value::Binary(Bytes::from(br#"{"k":1}"#.to_vec()))],
variant_struct_type,
))
);
// Non-JSON values should be rejected at push time.
let mut invalid_builder = JsonVectorBuilder::new(JsonNativeType::Bool, 1);
let err = invalid_builder
.try_push_value_ref(&ValueRef::Boolean(true))
.unwrap_err();
assert!(err.to_string().contains("expected json value"));
Ok(())
}
}

View File

@@ -122,7 +122,7 @@ impl Vector for StructVector {
.map(|i| {
let field_array = &self.array.column(i);
if field_array.is_null(i) {
if field_array.is_null(index) {
Value::Null
} else {
let scalar_value = ScalarValue::try_from_array(field_array, index).unwrap();
@@ -323,26 +323,6 @@ impl StructVectorBuilder {
}
self.null_buffer.append_null();
}
pub(crate) fn struct_type(&self) -> &StructType {
&self.fields
}
pub(crate) fn value_builders(&self) -> &[Box<dyn MutableVector>] {
&self.value_builders
}
pub(crate) fn mut_value_builders(&mut self) -> &mut [Box<dyn MutableVector>] {
&mut self.value_builders
}
pub(crate) fn null_buffer(&self) -> &NullBufferBuilder {
&self.null_buffer
}
pub(crate) fn mut_null_buffer(&mut self) -> &mut NullBufferBuilder {
&mut self.null_buffer
}
}
impl MutableVector for StructVectorBuilder {

View File

@@ -32,6 +32,7 @@ use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
};
use datatypes::data_type::DataType;
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::{MutableVector, Vector};
use datatypes::value::ValueRef;
use datatypes::vectors::Helper;
@@ -678,7 +679,10 @@ impl BulkPartConverter {
columns.push(values.sequence.to_arrow_array());
columns.push(values.op_type.to_arrow_array());
let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
// The actual datatype of JSON array is data oriented, not to be derived from the Region
// metadata, which is static. So here we have to align the schema.
let schema = align_schema_with_json_array(self.schema, &columns);
let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?;
// Sorts the record batch.
let batch = sort_primary_key_record_batch(&batch)?;
@@ -693,6 +697,26 @@ impl BulkPartConverter {
}
}
fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
return schema;
}
let mut fields = Vec::with_capacity(schema.fields().len());
for (field, array) in schema.fields().iter().zip(columns) {
if !is_json_extension_type(field) {
fields.push(field.clone());
continue;
}
let mut field = field.as_ref().clone();
field.set_data_type(array.data_type().clone());
fields.push(Arc::new(field));
}
Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
}
fn new_primary_key_column_builders(
metadata: &RegionMetadata,
capacity: usize,

View File

@@ -893,7 +893,9 @@ impl ValueBuilder {
size += field_value.data_size();
if !field_value.is_null() || self.fields[idx].is_some() {
if let Some(field) = self.fields[idx].as_mut() {
let _ = field.push(field_value);
field
.push(field_value)
.unwrap_or_else(|e| panic!("Failed to push field value: {e:?}"));
} else {
let mut mutable_vector =
if let ConcreteDataType::String(_) = &self.field_types[idx] {

View File

@@ -352,11 +352,10 @@ impl Inserter {
&self,
insert: &Insert,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
let (inserts, table_info) =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert, ctx, statement_executor)
.convert(insert, ctx)
.await?;
let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())]);

View File

@@ -12,23 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cell::LazyCell;
use std::collections::HashMap;
use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::options_from_column_schema;
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::{
AlterTableExpr, ColumnSchema as GrpcColumnSchema, ModifyColumnType, ModifyColumnTypes, Row,
Rows,
};
use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows};
use catalog::CatalogManager;
use common_telemetry::info;
use common_time::Timezone;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::types::JsonType;
use datatypes::value::Value;
use partition::manager::PartitionRuleManager;
use session::context::{QueryContext, QueryContextRef};
@@ -47,7 +37,6 @@ use crate::error::{
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::insert::semantic_type;
use crate::statement::StatementExecutor;
const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
@@ -74,11 +63,10 @@ impl<'a> StatementToRegion<'a> {
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
let name = stmt.table_name().context(ParseSqlSnafu)?;
let (catalog, schema, table_name) = self.get_full_name(name)?;
let mut table = self.get_table(&catalog, &schema, &table_name).await?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
ensure!(
@@ -128,26 +116,9 @@ impl<'a> StatementToRegion<'a> {
.collect::<Result<Vec<_>>>()
}
let mut insert_columns = find_insert_columns(&table, &column_names)?;
let insert_columns = find_insert_columns(&table, &column_names)?;
let converter = SqlRowConverter::new(&insert_columns, query_ctx);
// Convert the SQL values to GreptimeDB values, and merge a "largest" JSON types of all
// values on the way by `JsonColumnTypeUpdater`.
let mut updater = JsonColumnTypeUpdater::new(statement_executor, query_ctx);
let value_rows = converter.convert(&mut updater, &sql_rows)?;
// If the JSON values have a "larger" json type than the one in the table schema, modify
// the column's json type first, by executing an "alter table" DDL.
if updater
.maybe_update_column_type(&catalog, &schema, &table_name, &insert_columns)
.await?
{
// Update with the latest schema, if changed.
table = self.get_table(&catalog, &schema, &table_name).await?;
insert_columns = find_insert_columns(&table, &column_names)?;
}
// Finally convert GreptimeDB values to GRPC values, ready to do insertion on Datanode.
let value_rows = converter.convert(&sql_rows)?;
for (i, row) in value_rows.into_iter().enumerate() {
for value in row {
let grpc_value = to_grpc_value(value);
@@ -248,11 +219,7 @@ impl<'a, 'b> SqlRowConverter<'a, 'b> {
}
}
fn convert(
&self,
updater: &mut JsonColumnTypeUpdater<'_, 'a>,
sql_rows: &[Vec<SqlValue>],
) -> Result<Vec<Vec<Value>>> {
fn convert(&self, sql_rows: &[Vec<SqlValue>]) -> Result<Vec<Vec<Value>>> {
let timezone = Some(&self.query_context.timezone());
let auto_string_to_numeric = self.query_context.auto_string_to_numeric();
@@ -263,9 +230,6 @@ impl<'a, 'b> SqlRowConverter<'a, 'b> {
for (insert_column, sql_value) in self.insert_columns.iter().zip(sql_row) {
let value =
sql_value_to_value(insert_column, sql_value, timezone, auto_string_to_numeric)?;
updater.merge_types(insert_column, &value)?;
value_row.push(value);
}
value_rows.push(value_row);
@@ -274,108 +238,6 @@ impl<'a, 'b> SqlRowConverter<'a, 'b> {
}
}
struct JsonColumnTypeUpdater<'a, 'b> {
statement_executor: &'a StatementExecutor,
query_context: &'a QueryContextRef,
merged_value_types: LazyCell<HashMap<&'b str, JsonType>>,
}
impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> {
fn new(statement_executor: &'a StatementExecutor, query_context: &'a QueryContextRef) -> Self {
Self {
statement_executor,
query_context,
merged_value_types: LazyCell::new(Default::default),
}
}
fn merge_types(&mut self, column_schema: &'b ColumnSchema, value: &Value) -> Result<()> {
if !matches!(value, Value::Json(_)) {
return Ok(());
}
if let ConcreteDataType::Json(value_type) = value.data_type() {
let merged_type = self
.merged_value_types
.entry(&column_schema.name)
.or_insert_with(|| value_type.clone());
if !merged_type.is_include(&value_type) {
merged_type.merge(&value_type).map_err(|e| {
InvalidInsertRequestSnafu {
reason: format!(r#"cannot merge "{value_type}" into "{merged_type}": {e}"#),
}
.build()
})?;
}
}
Ok(())
}
async fn maybe_update_column_type(
self,
catalog: &str,
schema: &str,
table: &str,
insert_columns: &[&ColumnSchema],
) -> Result<bool> {
let mut has_update = false;
for (column_name, merged_type) in self.merged_value_types.iter() {
let Some(column_type) = insert_columns
.iter()
.find_map(|x| (&x.name == column_name).then(|| x.data_type.as_json()))
.flatten()
else {
continue;
};
if column_type.is_include(merged_type) {
continue;
}
let new_column_type = {
let mut x = column_type.clone();
x.merge(merged_type)
.map_err(|e| {
InvalidInsertRequestSnafu {
reason: format!(
r#"cannot merge "{merged_type}" into "{column_type}": {e}"#
),
}
.build()
})
.map(|()| x)
}?;
info!(
"updating table {}.{}.{} column {} json type: {} => {}",
catalog, schema, table, column_name, column_type, new_column_type,
);
let (target_type, target_type_extension) =
ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(new_column_type))
.context(ColumnDataTypeSnafu)?
.into_parts();
let alter_expr = AlterTableExpr {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: column_name.to_string(),
target_type: target_type as i32,
target_type_extension,
}],
})),
};
self.statement_executor
.alter_table_inner(alter_expr, self.query_context.clone())
.await?;
has_update = true;
}
Ok(has_update)
}
}
fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
if !stmt.columns().is_empty() {
stmt.columns()

View File

@@ -28,7 +28,7 @@ impl StatementExecutor {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly
self.inserter
.handle_statement_insert(insert.as_ref(), &query_ctx, self)
.handle_statement_insert(insert.as_ref(), &query_ctx)
.await
} else {
// Slow path: insert with subquery. Execute using query engine.

View File

@@ -418,7 +418,7 @@ fn resolve_schema(
match (column_type, value_type) {
(column_type, value_type) if column_type == value_type => Ok(()),
(ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
if column_type.is_include(value_type) =>
if column_type.is_json2() && value_type.is_json2() =>
{
Ok(())
}

View File

@@ -310,7 +310,7 @@ pub fn sql_data_type_to_concrete_data_type(
JsonStructureSettings::UnstructuredRaw => Some(JsonNativeType::Variant),
_ => None,
})
.unwrap_or(JsonNativeType::Null);
.unwrap_or(JsonNativeType::Object(Default::default()));
let format = JsonFormat::Json2(Box::new(native_type));
Ok(ConcreteDataType::Json(JsonType::new(format)))
}

View File

@@ -8,6 +8,36 @@ create table json2_table (
Affected Rows: 0
insert into json2_table (ts, j)
values (1, '{"a": {"b": 1}, "c": "s1", "d": [{"e": {"f": 0.1}}]}'),
(2, '{"a": {"b": -2}, "c": "s2", "d": [{"e": {"f": 0.2}}]}');
Affected Rows: 2
insert into json2_table (ts, j)
values (3, '{"a": {"b": 3}, "c": "s3"}');
Affected Rows: 1
insert into json2_table
values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
(5, '{"a": {}, "c": "s5"}'),
(6, '{"c": "s6"}');
Affected Rows: 3
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');
Affected Rows: 2
insert into json2_table
values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'),
(10, '{"a": {"b": 10}, "y": false}');
Affected Rows: 2
drop table json2_table;
Affected Rows: 0

View File

@@ -6,4 +6,24 @@ create table json2_table (
'sst_format' = 'flat',
);
insert into json2_table (ts, j)
values (1, '{"a": {"b": 1}, "c": "s1", "d": [{"e": {"f": 0.1}}]}'),
(2, '{"a": {"b": -2}, "c": "s2", "d": [{"e": {"f": 0.2}}]}');
insert into json2_table (ts, j)
values (3, '{"a": {"b": 3}, "c": "s3"}');
insert into json2_table
values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
(5, '{"a": {}, "c": "s5"}'),
(6, '{"c": "s6"}');
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');
insert into json2_table
values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'),
(10, '{"a": {"b": 10}, "y": false}');
drop table json2_table;