From a27c4f6005e767165fbb8df3853058f1beefd9da Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 3 Mar 2026 17:34:05 +0800 Subject: [PATCH] x Signed-off-by: luofucong --- src/api/src/helper.rs | 3 +- src/common/function/src/scalars/json.rs | 3 + .../function/src/scalars/json/json2_get.rs | 136 +++++++++++++++ src/common/recordbatch/src/recordbatch.rs | 5 + src/common/sql/src/convert.rs | 2 +- src/datatypes/src/schema.rs | 1 + src/datatypes/src/schema/ext.rs | 25 +++ src/datatypes/src/types/json_type.rs | 68 +++++++- src/datatypes/src/value.rs | 2 +- src/datatypes/src/vectors/json.rs | 1 + src/datatypes/src/vectors/json/builder2.rs | 163 ++++++++++++++++++ src/mito2/src/engine/basic_test.rs | 20 +-- src/mito2/src/flush.rs | 13 ++ src/mito2/src/memtable.rs | 64 +++++++ src/mito2/src/memtable/bulk.rs | 8 + src/mito2/src/memtable/bulk/part.rs | 109 +++++++++++- src/mito2/src/memtable/time_series.rs | 4 +- src/mito2/src/read/flat_projection.rs | 5 + src/mito2/src/read/projection.rs | 8 + src/mito2/src/read/range.rs | 6 + src/mito2/src/read/scan_region.rs | 60 +++++++ src/mito2/src/sst/parquet/flat_format.rs | 1 + src/mito2/src/sst/parquet/reader.rs | 7 +- src/mito2/src/sst/parquet/writer.rs | 2 +- .../src/req_convert/insert/stmt_to_region.rs | 26 ++- src/query/src/datafusion.rs | 1 + .../src/datafusion/json2_expr_planner.rs | 54 ++++++ src/query/src/datafusion/planner.rs | 6 +- src/sql/src/statements.rs | 44 +++-- src/sql/src/statements/create.rs | 47 ++--- .../common/types/json/json-structured.result | 82 --------- .../common/types/json/json-structured.sql | 28 --- .../standalone/common/types/json/json2.result | 129 ++++++++++++++ .../standalone/common/types/json/json2.sql | 42 +++++ 34 files changed, 992 insertions(+), 183 deletions(-) create mode 100644 src/common/function/src/scalars/json/json2_get.rs create mode 100644 src/datatypes/src/schema/ext.rs create mode 100644 src/datatypes/src/vectors/json/builder2.rs create mode 100644 src/query/src/datafusion/json2_expr_planner.rs delete mode 100644 tests/cases/standalone/common/types/json/json-structured.result delete mode 100644 tests/cases/standalone/common/types/json/json-structured.sql create mode 100644 tests/cases/standalone/common/types/json/json2.result create mode 100644 tests/cases/standalone/common/types/json/json2.sql diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 4664c0434b..6d5ea13461 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -129,7 +129,7 @@ impl From for ConcreteDataType { }; ConcreteDataType::json_native_datatype(inner_type.into()) } - None => ConcreteDataType::Json(JsonType::null()), + None => ConcreteDataType::Json(JsonType::new(JsonFormat::Json2)), _ => { // invalid state, type extension is missing or invalid ConcreteDataType::null_datatype() @@ -461,6 +461,7 @@ impl TryFrom for ColumnDataTypeWrapper { }) } } + JsonFormat::Json2 => Some(ColumnDataTypeExtension { type_ext: None }), } } else { None diff --git a/src/common/function/src/scalars/json.rs b/src/common/function/src/scalars/json.rs index 801744a0fa..f4b33d4802 100644 --- a/src/common/function/src/scalars/json.rs +++ b/src/common/function/src/scalars/json.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod json2_get; pub mod json_get; mod json_is; mod json_path_exists; @@ -24,6 +25,7 @@ use json_is::{ JsonIsArray, JsonIsBool, JsonIsFloat, JsonIsInt, JsonIsNull, JsonIsObject, JsonIsString, }; use json_to_string::JsonToStringFunction; +use json2_get::Json2GetFunction; use parse_json::ParseJsonFunction; use crate::function_registry::FunctionRegistry; @@ -42,6 +44,7 @@ impl JsonFunction { registry.register_scalar(JsonGetBool::default()); registry.register_scalar(JsonGetObject::default()); registry.register_scalar(JsonGetWithType::default()); + registry.register_scalar(Json2GetFunction::default()); registry.register_scalar(JsonIsNull::default()); registry.register_scalar(JsonIsInt::default()); diff --git a/src/common/function/src/scalars/json/json2_get.rs b/src/common/function/src/scalars/json/json2_get.rs new file mode 100644 index 0000000000..d63922bc3d --- /dev/null +++ b/src/common/function/src/scalars/json/json2_get.rs @@ -0,0 +1,136 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_cast::display::array_value_to_string; +use datafusion_common::arrow::array::{ + Array, ArrayRef, StringViewBuilder, StructArray, new_null_array, +}; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err}; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility}; +use derive_more::Display; + +use crate::function::Function; + +#[derive(Display, Debug)] +#[display("{}", Self::NAME.to_ascii_uppercase())] +pub struct Json2GetFunction { + signature: Signature, +} + +impl Json2GetFunction { + pub const NAME: &'static str = "json2_get"; +} + +impl Function for Json2GetFunction { + fn name(&self) -> &str { + Self::NAME + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Utf8View) + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + if args.args.len() != 2 { + return exec_err!("json2_get expects 2 arguments, got {}", args.args.len()); + } + + let input = args.args[0].to_array(args.number_rows)?; + let path = path_from_arg(&args.args[1])?; + + let segments: Vec<&str> = if path.is_empty() { + Vec::new() + } else { + path.split('.').collect() + }; + let Some(struct_path) = resolve_struct_path(&input, &segments) else { + return Ok(ColumnarValue::Array(new_null_array( + args.return_type(), + input.len(), + ))); + }; + + let values = display_array_from_path(&struct_path)?; + Ok(ColumnarValue::Array(values)) + } +} + +impl Default for Json2GetFunction { + fn default() -> Self { + Self { + signature: Signature::one_of(vec![TypeSignature::Any(2)], Volatility::Immutable), + } + } +} + +fn path_from_arg(arg: &ColumnarValue) -> Result<&String> { + match arg { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(path))) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(path))) + | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(path))) => Ok(path), + ColumnarValue::Scalar(_) => exec_err!("json2_get expects a string path"), + ColumnarValue::Array(_) => exec_err!("json2_get expects a literal path"), + } +} + +struct StructPath { + parents: Vec, + leaf: ArrayRef, +} + +fn resolve_struct_path(array: &ArrayRef, segments: &[&str]) -> Option { + let mut current = array.clone(); + let mut parents = Vec::with_capacity(segments.len()); + + for segment in segments { + let struct_array = current.as_any().downcast_ref::()?; + let DataType::Struct(fields) = current.data_type() else { + unreachable!() + }; + let index = fields.iter().position(|field| field.name() == *segment)?; + parents.push(current.clone()); + current = struct_array.column(index).clone(); + } + + Some(StructPath { + parents, + leaf: current, + }) +} + +fn struct_path_is_null(parents: &[ArrayRef], index: usize) -> bool { + parents.iter().any(|parent| parent.is_null(index)) +} + +fn display_array_from_path(path: &StructPath) -> Result { + let mut builder = StringViewBuilder::with_capacity(path.leaf.len()); + for index in 0..path.leaf.len() { + if struct_path_is_null(&path.parents, index) || path.leaf.is_null(index) { + builder.append_null(); + continue; + } + + let value = array_value_to_string(path.leaf.as_ref(), index) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + builder.append_value(value); + } + Ok(Arc::new(builder.finish())) +} diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index f6e0aeed93..15e25e0352 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -395,6 +395,11 @@ pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> R &array_columns[j], schema_field.data_type(), )?); + } else if schema_field.data_type() != array_field.data_type() { + aligned.push( + compute::cast(&array_columns[j], schema_field.data_type()) + .context(ArrowComputeSnafu)?, + ); } else { aligned.push(array_columns[j].clone()); } diff --git a/src/common/sql/src/convert.rs b/src/common/sql/src/convert.rs index bd9a1d0769..32a2407db4 100644 --- a/src/common/sql/src/convert.rs +++ b/src/common/sql/src/convert.rs @@ -306,7 +306,7 @@ pub(crate) fn parse_string_to_value( let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?; Ok(Value::Binary(v.into())) } - JsonFormat::Native(_) => { + JsonFormat::Native(_) | JsonFormat::Json2 => { let extension_type: Option = column_schema.extension_type().context(DatatypeSnafu)?; let json_structure_settings = extension_type diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 9070e2babe..65607ddf29 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -14,6 +14,7 @@ mod column_schema; pub mod constraint; +pub mod ext; use std::collections::HashMap; use std::fmt; diff --git a/src/datatypes/src/schema/ext.rs b/src/datatypes/src/schema/ext.rs new file mode 100644 index 0000000000..d36e6f13d8 --- /dev/null +++ b/src/datatypes/src/schema/ext.rs @@ -0,0 +1,25 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::extension::json; + +pub trait ArrowSchemaExt { + fn has_json_extension_field(&self) -> bool; +} + +impl ArrowSchemaExt for arrow_schema::Schema { + fn has_json_extension_field(&self) -> bool { + self.fields().iter().any(json::is_json_extension_type) + } +} diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index 61586fc460..19b1624c41 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::collections::BTreeMap; use std::fmt::{Debug, Display, Formatter}; use std::str::FromStr; use std::sync::{Arc, LazyLock}; use arrow::datatypes::DataType as ArrowDataType; +use arrow_schema::Fields; use common_base::bytes::Bytes; use regex::{Captures, Regex}; use serde::{Deserialize, Serialize}; @@ -33,6 +35,7 @@ use crate::type_id::LogicalTypeId; use crate::types::{ListType, StructField, StructType}; use crate::value::Value; use crate::vectors::json::builder::JsonVectorBuilder; +use crate::vectors::json::builder2::Json2VectorBuilder; use crate::vectors::{BinaryVectorBuilder, MutableVector}; pub const JSON_TYPE_NAME: &str = "Json"; @@ -164,6 +167,7 @@ pub enum JsonFormat { #[default] Jsonb, Native(Box), + Json2, } /// JsonType is a data type for JSON data. It is stored as binary data of jsonb format. @@ -192,6 +196,7 @@ impl JsonType { match &self.format { JsonFormat::Jsonb => &JsonNativeType::String, JsonFormat::Native(x) => x.as_ref(), + JsonFormat::Json2 => unimplemented!(), } } @@ -212,15 +217,24 @@ impl JsonType { ConcreteDataType::Struct(t) => t.clone(), x => plain_json_struct_type(x), }, + JsonFormat::Json2 => unimplemented!(), } } /// Try to merge this json type with others, error on datatype conflict. pub fn merge(&mut self, other: &JsonType) -> Result<()> { + self.merge_with(other, false) + } + + pub fn merge_with_lifting(&mut self, other: &JsonType) -> Result<()> { + self.merge_with(other, true) + } + + fn merge_with(&mut self, other: &JsonType, lift: bool) -> Result<()> { match (&self.format, &other.format) { (JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()), (JsonFormat::Native(this), JsonFormat::Native(that)) => { - let merged = merge(this.as_ref(), that.as_ref())?; + let merged = merge(this.as_ref(), that.as_ref(), lift)?; self.format = JsonFormat::Native(Box::new(merged)); Ok(()) } @@ -313,13 +327,17 @@ fn is_mergeable(this: &JsonNativeType, that: &JsonNativeType) -> bool { } } -fn merge(this: &JsonNativeType, that: &JsonNativeType) -> Result { - fn merge_object(this: &JsonObjectType, that: &JsonObjectType) -> Result { +fn merge(this: &JsonNativeType, that: &JsonNativeType, lift: bool) -> Result { + fn merge_object( + this: &JsonObjectType, + that: &JsonObjectType, + lift: bool, + ) -> Result { let mut this = this.clone(); // merge "that" into "this" directly: for (type_name, that_type) in that { if let Some(this_type) = this.get_mut(type_name) { - let merged_type = merge(this_type, that_type)?; + let merged_type = merge(this_type, that_type, lift)?; *this_type = merged_type; } else { this.insert(type_name.clone(), that_type.clone()); @@ -331,16 +349,45 @@ fn merge(this: &JsonNativeType, that: &JsonNativeType) -> Result match (this, that) { (this, that) if this == that => Ok(this.clone()), (JsonNativeType::Array(this), JsonNativeType::Array(that)) => { - merge(this.as_ref(), that.as_ref()).map(|x| JsonNativeType::Array(Box::new(x))) + merge(this.as_ref(), that.as_ref(), lift).map(|x| JsonNativeType::Array(Box::new(x))) } (JsonNativeType::Object(this), JsonNativeType::Object(that)) => { - merge_object(this, that).map(JsonNativeType::Object) + merge_object(this, that, lift).map(JsonNativeType::Object) } (JsonNativeType::Null, x) | (x, JsonNativeType::Null) => Ok(x.clone()), - _ => MergeJsonDatatypeSnafu { - reason: format!("datatypes have conflict, this: {this}, that: {that}"), + _ => { + if lift { + Ok(JsonNativeType::String) + } else { + MergeJsonDatatypeSnafu { + reason: format!("datatypes have conflict, this: {this}, that: {that}"), + } + .fail() + } } - .fail(), + } +} + +pub fn merge_as_json_type<'a>( + left: &'a ArrowDataType, + right: &ArrowDataType, +) -> Cow<'a, ArrowDataType> { + if left == right { + return Cow::Borrowed(left); + } + + let mut left = JsonType::from(left); + let right = JsonType::from(right); + Cow::Owned(if left.merge_with_lifting(&right).is_ok() { + left.as_arrow_type() + } else { + ArrowDataType::Utf8 + }) +} + +impl From<&ArrowDataType> for JsonType { + fn from(t: &ArrowDataType) -> Self { + JsonType::new_native(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t))) } } @@ -349,6 +396,7 @@ impl DataType for JsonType { match &self.format { JsonFormat::Jsonb => JSON_TYPE_NAME.to_string(), JsonFormat::Native(x) => format!("Json<{x}>"), + JsonFormat::Json2 => "JSON2".to_string(), } } @@ -364,6 +412,7 @@ impl DataType for JsonType { match self.format { JsonFormat::Jsonb => ArrowDataType::Binary, JsonFormat::Native(_) => self.as_struct_type().as_arrow_type(), + JsonFormat::Json2 => ArrowDataType::Struct(Fields::empty()), } } @@ -371,6 +420,7 @@ impl DataType for JsonType { match &self.format { JsonFormat::Jsonb => Box::new(BinaryVectorBuilder::with_capacity(capacity)), JsonFormat::Native(x) => Box::new(JsonVectorBuilder::new(*x.clone(), capacity)), + JsonFormat::Json2 => Box::new(Json2VectorBuilder::new(JsonNativeType::Null, capacity)), } } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index ab64c801e5..71c1423da1 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -3206,7 +3206,7 @@ pub(crate) mod tests { ] .into(), )), - 48, + 56, ); } diff --git a/src/datatypes/src/vectors/json.rs b/src/datatypes/src/vectors/json.rs index 83aa1dd2aa..b783d3c1a5 100644 --- a/src/datatypes/src/vectors/json.rs +++ b/src/datatypes/src/vectors/json.rs @@ -13,3 +13,4 @@ // limitations under the License. pub(crate) mod builder; +pub(crate) mod builder2; diff --git a/src/datatypes/src/vectors/json/builder2.rs b/src/datatypes/src/vectors/json/builder2.rs new file mode 100644 index 0000000000..5fff890dfc --- /dev/null +++ b/src/datatypes/src/vectors/json/builder2.rs @@ -0,0 +1,163 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::borrow::Cow; +use std::sync::LazyLock; + +use crate::data_type::ConcreteDataType; +use crate::error::{Result, TryFromValueSnafu, UnsupportedOperationSnafu}; +use crate::json::value::{JsonValue, JsonValueRef, JsonVariant}; +use crate::prelude::{ValueRef, Vector, VectorRef}; +use crate::types::JsonType; +use crate::types::json_type::JsonNativeType; +use crate::vectors::{MutableVector, StructVectorBuilder}; + +pub(crate) struct Json2VectorBuilder { + merged_type: JsonType, + capacity: usize, + values: Vec, +} + +impl Json2VectorBuilder { + pub(crate) fn new(json_type: JsonNativeType, capacity: usize) -> Self { + Self { + merged_type: JsonType::new_native(json_type), + capacity, + values: vec![], + } + } + + fn build(&self) -> VectorRef { + let mut builder = StructVectorBuilder::with_type_and_capacity( + self.merged_type.as_struct_type(), + self.capacity, + ); + for value in self.values.iter() { + let value = align_json_value_with_type(&self.merged_type, value); + builder + .try_push_value_ref(&(*value).as_ref().as_value_ref()) + // Safety: after the `align_json_value_with_type`, the values to push must have + // the same types with the builder, so it's not expected to meet any errors here. + .unwrap_or_else(|e| panic!("Failed to push JSON value {value}: {e:?}")); + } + builder.to_vector() + } +} + +impl MutableVector for Json2VectorBuilder { + fn data_type(&self) -> ConcreteDataType { + ConcreteDataType::Json(self.merged_type.clone()) + } + + fn len(&self) -> usize { + self.values.len() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn to_vector(&mut self) -> VectorRef { + self.build() + } + + fn to_vector_cloned(&self) -> VectorRef { + self.build() + } + + fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> { + let ValueRef::Json(value) = value else { + return TryFromValueSnafu { + reason: format!("expected json value, got {value:?}"), + } + .fail(); + }; + let json_type = value.json_type(); + self.merged_type.merge_with_lifting(json_type)?; + + let value = JsonValue::from(value.clone().into_variant()); + self.values.push(value); + Ok(()) + } + + fn push_null(&mut self) { + static NULL_JSON: LazyLock = + LazyLock::new(|| ValueRef::Json(Box::new(JsonValueRef::null()))); + self.try_push_value_ref(&NULL_JSON) + // Safety: learning from the method "try_push_value_ref", a null json value should be + // always able to push into any json vectors. + .unwrap_or_else(|e| panic!("failed to push null json value, error: {e}")); + } + + fn extend_slice_of(&mut self, _: &dyn Vector, _: usize, _: usize) -> Result<()> { + UnsupportedOperationSnafu { + op: "extend_slice_of", + vector_type: "JsonVector", + } + .fail() + } +} + +fn align_json_value_with_type<'a>( + expected_type: &JsonType, + value: &'a JsonValue, +) -> Cow<'a, JsonValue> { + if value.json_type() == expected_type { + return Cow::Borrowed(value); + } + + fn helper(expected_type: &JsonNativeType, value: JsonVariant) -> JsonVariant { + match (expected_type, value) { + (_, JsonVariant::Null) | (JsonNativeType::Null, _) => JsonVariant::Null, + (JsonNativeType::Bool, JsonVariant::Bool(v)) => JsonVariant::Bool(v), + (JsonNativeType::Number(_), JsonVariant::Number(v)) => JsonVariant::Number(v), + (JsonNativeType::String, JsonVariant::String(v)) => JsonVariant::String(v), + + (JsonNativeType::Array(item_type), JsonVariant::Array(items)) => JsonVariant::Array( + items + .into_iter() + .map(|item| helper(item_type.as_ref(), item)) + .collect(), + ), + + (JsonNativeType::Object(expected_fields), JsonVariant::Object(object)) => { + JsonVariant::Object( + expected_fields + .iter() + .map(|(field_name, expected_field_type)| { + let value = + object.get(field_name).cloned().unwrap_or(JsonVariant::Null); + (field_name.clone(), helper(expected_field_type, value)) + }) + .collect(), + ) + } + + (JsonNativeType::String, v) => { + let json: serde_json::Value = JsonValue::from(v).into(); + JsonVariant::String(json.to_string()) + } + + (t, v) => panic!("unsupported json alignment cast from {v} to {t}"), + } + } + + let value = helper(expected_type.native_type(), value.clone().into_variant()); + Cow::Owned(JsonValue::from(value)) +} diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index ed638bf69b..8e874ba67a 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -981,17 +981,17 @@ async fn test_list_ssts_with_format( #[tokio::test] async fn test_all_index_metas_list_all_types() { test_all_index_metas_list_all_types_with_format(false, r#" -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; test_all_index_metas_list_all_types_with_format(true, r#" -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6600), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; } async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expect_format: &str) { diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index dd575ac687..be0702010f 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -22,6 +22,7 @@ use std::time::Instant; use common_telemetry::{debug, error, info}; use datatypes::arrow::datatypes::SchemaRef; +use datatypes::schema::ext::ArrowSchemaExt; use either::Either; use partition::expr::PartitionExpr; use smallvec::{SmallVec, smallvec}; @@ -40,6 +41,7 @@ use crate::error::{ RegionTruncatedSnafu, Result, }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::memtable; use crate::memtable::bulk::ENCODE_ROW_THRESHOLD; use crate::memtable::{ BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions, @@ -587,6 +589,7 @@ impl RegionFlushTask { &version.metadata, &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding), ); + let batch_schema = maybe_merge_json_fields(batch_schema, &mem_ranges); let flat_sources = memtable_flat_sources( batch_schema, mem_ranges, @@ -762,6 +765,16 @@ struct FlatSources { encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>, } +fn maybe_merge_json_fields(base: SchemaRef, mem_ranges: &MemtableRanges) -> SchemaRef { + if !base.has_json_extension_field() { + return base; + } + let Some(schema) = mem_ranges.schema() else { + return base; + }; + memtable::merge_json_extension_fields(&base, &[schema]) +} + /// Returns the max sequence and [FlatSource] for the given memtable. fn memtable_flat_sources( schema: SchemaRef, diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index a352a10805..63db9dc2bb 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -14,6 +14,7 @@ //! Memtables are write buffers for regions. +use std::borrow::Cow; use std::collections::BTreeMap; use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -59,6 +60,10 @@ pub use bulk::part::{ BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size, sort_primary_key_record_batch, }; +use datatypes::arrow::datatypes::{Schema, SchemaRef}; +use datatypes::extension::json; +use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::types::json_type; #[cfg(any(test, feature = "test"))] pub use time_partition::filter_record_batch; @@ -225,6 +230,55 @@ impl MemtableRanges { .max() .unwrap_or(0) } + + pub(crate) fn schema(&self) -> Option { + let mut schemas = self + .ranges + .values() + .filter_map(|x| x.record_batch_schema()) + .collect::>(); + + if schemas.iter().all(|x| !x.has_json_extension_field()) { + // If there are no JSON extension fields in any schemas, the invariant must be hold, + // that all schemas are same (they are all derived from same region metadata). + // So it's ok to return the first one as the schema of the whole memtable ranges. + return (!schemas.is_empty()).then(|| schemas.swap_remove(0)); + } + + // If there are JSON extension fields, by convention, only their concrete data types + // (Arrow's Struct) may differ. Other things like the metadata or the fields count are same. + // So to produce the final schema, we can solely merge the data types. + schemas + .split_first() + .map(|(first, rest)| merge_json_extension_fields(first, rest)) + } +} + +pub(crate) fn merge_json_extension_fields(base: &SchemaRef, others: &[SchemaRef]) -> SchemaRef { + let mut fields = base.fields().iter().cloned().collect::>(); + for (i, field) in fields.iter_mut().enumerate() { + if !json::is_json_extension_type(field) { + continue; + } + + let merged = others + .iter() + .map(|x| Cow::Borrowed(x.field(i).data_type())) + .reduce(|acc, e| { + Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned()) + }); + if let Some(merged) = merged + && field.data_type() != merged.as_ref() + { + let merged = + json_type::merge_as_json_type(field.data_type(), merged.as_ref()).into_owned(); + + let mut new = field.as_ref().clone(); + new.set_data_type(merged); + *field = Arc::new(new); + } + } + Arc::new(Schema::new_with_metadata(fields, base.metadata().clone())) } impl IterBuilder for MemtableRanges { @@ -552,6 +606,11 @@ pub trait IterBuilder: Send + Sync { .fail() } + /// Returns the schema of record batches produced by this iterator. + fn record_batch_schema(&self) -> Option { + None + } + /// Returns the [EncodedRange] if the range is already encoded into SST. fn encoded_range(&self) -> Option { None @@ -646,6 +705,11 @@ impl MemtableRange { self.context.builder.is_record_batch() } + /// Returns the schema of record batches if this range supports record batch iteration. + pub fn record_batch_schema(&self) -> Option { + self.context.builder.record_batch_schema() + } + pub fn num_rows(&self) -> usize { self.stats.num_rows } diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 3817560932..dbc168637f 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -819,6 +819,10 @@ impl IterBuilder for BulkRangeIterBuilder { fn encoded_range(&self) -> Option { None } + + fn record_batch_schema(&self) -> Option { + Some(self.part.batch.schema()) + } } impl IterBuilder for MultiBulkRangeIterBuilder { @@ -850,6 +854,10 @@ impl IterBuilder for MultiBulkRangeIterBuilder { fn encoded_range(&self) -> Option { None } + + fn record_batch_schema(&self) -> Option { + self.part.record_batch_schema() + } } /// Iterator builder for encoded bulk range diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 71e49776c0..8c6b371a62 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -14,6 +14,7 @@ //! Bulk part encoder/decoder. +use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -23,7 +24,7 @@ use api::v1::bulk_wal_entry::Body; use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry}; use bytes::Bytes; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; -use common_recordbatch::DfRecordBatch as RecordBatch; +use common_recordbatch::{DfRecordBatch as RecordBatch, recordbatch}; use common_time::Timestamp; use common_time::timestamp::TimeUnit; use datatypes::arrow; @@ -39,7 +40,9 @@ use datatypes::arrow::datatypes::{ }; use datatypes::arrow_array::BinaryArray; use datatypes::data_type::DataType; +use datatypes::extension::json::is_json_extension_type; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; +use datatypes::types::json_type; use datatypes::value::{Value, ValueRef}; use datatypes::vectors::Helper; use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef}; @@ -62,7 +65,7 @@ use table::predicate::Predicate; use crate::error::{ self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, - InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu, + InvalidRequestSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu, }; use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::EncodedBulkPartIter; @@ -436,11 +439,10 @@ impl UnorderedPart { return Ok(Some(self.parts[0].batch.clone())); } - // Get the schema from the first part + // Get the schema from the first part and normalize JSON2 columns across all parts. let schema = self.parts[0].batch.schema(); + let (schema, batches) = normalize_json_columns_for_concat(schema, &self.parts)?; - // Concatenate all record batches - let batches: Vec = self.parts.iter().map(|p| p.batch.clone()).collect(); let concatenated = arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?; @@ -477,6 +479,75 @@ impl UnorderedPart { self.max_timestamp = i64::MIN; self.max_sequence = 0; } + + pub(crate) fn parts(&self) -> &[BulkPart] { + &self.parts + } +} + +fn normalize_json_columns_for_concat( + base_schema: SchemaRef, + parts: &[BulkPart], +) -> Result<(SchemaRef, Vec)> { + let mut merged_json_types = HashMap::new(); + for (index, field) in base_schema.fields().iter().enumerate() { + if !is_json_extension_type(field) { + continue; + } + + let merged = parts + .iter() + .map(|x| Cow::Borrowed(x.batch.schema_ref().field(index).data_type())) + .reduce(|acc, e| { + Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned()) + }); + if let Some(merged) = merged + && merged.as_ref() != field.data_type() + { + merged_json_types.insert(index, merged.into_owned()); + } + } + + if merged_json_types.is_empty() { + let batches = parts.iter().map(|p| p.batch.clone()).collect(); + return Ok((base_schema, batches)); + } + + let fields = base_schema + .fields() + .iter() + .enumerate() + .map(|(index, field)| { + if let Some(data_type) = merged_json_types.get(&index) { + Arc::new( + Field::new(field.name().clone(), data_type.clone(), field.is_nullable()) + .with_metadata(field.metadata().clone()), + ) + } else { + field.clone() + } + }) + .collect::>(); + let normalized_schema = Arc::new(Schema::new(fields)); + + let mut normalized_batches = Vec::with_capacity(parts.len()); + for part in parts { + let mut columns = Vec::with_capacity(part.batch.num_columns()); + for (index, column) in part.batch.columns().iter().enumerate() { + if let Some(target_type) = merged_json_types.get(&index) { + columns.push( + recordbatch::align_json_array(column, target_type).context(RecordBatchSnafu)?, + ); + } else { + columns.push(column.clone()); + } + } + let batch = RecordBatch::try_new(normalized_schema.clone(), columns) + .context(NewRecordBatchSnafu)?; + normalized_batches.push(batch); + } + + Ok((normalized_schema, normalized_batches)) } /// More accurate estimation of the size of a record batch. @@ -693,7 +764,8 @@ impl BulkPartConverter { columns.push(values.sequence.to_arrow_array()); columns.push(values.op_type.to_arrow_array()); - let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?; + let schema = align_schema_with_json_array(self.schema, &columns); + let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?; // Sorts the record batch. let batch = sort_primary_key_record_batch(&batch)?; @@ -708,6 +780,26 @@ impl BulkPartConverter { } } +fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef { + if schema.fields().iter().all(|f| !is_json_extension_type(f)) { + return schema; + } + + let mut fields = Vec::with_capacity(schema.fields().len()); + for (field, array) in schema.fields().iter().zip(columns) { + if !is_json_extension_type(field) { + fields.push(field.clone()); + continue; + } + + let mut field = field.as_ref().clone(); + field.set_data_type(array.data_type().clone()); + fields.push(Arc::new(field)); + } + + Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())) +} + fn new_primary_key_column_builders( metadata: &RegionMetadata, capacity: usize, @@ -1346,6 +1438,11 @@ impl MultiBulkPart { self.series_count } + /// Returns the schema of batches in this part. + pub(crate) fn record_batch_schema(&self) -> Option { + self.batches.first().map(|batch| batch.schema()) + } + /// Returns the number of record batches in this part. pub fn num_batches(&self) -> usize { self.batches.len() diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index fdf051e1b7..db82e74596 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -917,7 +917,9 @@ impl ValueBuilder { size += field_value.data_size(); if !field_value.is_null() || self.fields[idx].is_some() { if let Some(field) = self.fields[idx].as_mut() { - let _ = field.push(field_value); + field + .push(field_value) + .unwrap_or_else(|e| panic!("Failed to push field value: {e:?}")); } else { let mut mutable_vector = if let ConcreteDataType::String(_) = &self.field_types[idx] { diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 7394a3c4ab..25d522b48a 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -40,6 +40,7 @@ use crate::sst::{ /// /// This mapper support duplicate and unsorted projection indices. /// The output schema is determined by the projection indices. +#[derive(Clone)] pub struct FlatProjectionMapper { /// Metadata of the region. metadata: RegionMetadataRef, @@ -237,6 +238,10 @@ impl FlatProjectionMapper { self.output_schema.clone() } + pub(crate) fn with_output_schema(&mut self, output_schema: SchemaRef) { + self.output_schema = output_schema; + } + /// Returns an empty [RecordBatch]. pub(crate) fn empty_record_batch(&self) -> RecordBatch { RecordBatch::new_empty(self.output_schema.clone()) diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 2c000e7bdc..c0b57fb27c 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -40,6 +40,7 @@ use crate::read::flat_projection::FlatProjectionMapper; const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; /// Wrapper enum for different projection mapper implementations. +#[derive(Clone)] pub enum ProjectionMapper { /// Projection mapper for primary key format. PrimaryKey(PrimaryKeyProjectionMapper), @@ -148,6 +149,12 @@ impl ProjectionMapper { } } + pub(crate) fn with_flat_output_schema(&mut self, output_schema: SchemaRef) { + if let ProjectionMapper::Flat(m) = self { + m.with_output_schema(output_schema) + } + } + /// Returns an empty [RecordBatch]. // TODO(yingwen): This is unused now. Use it after we finishing the flat format. pub fn empty_record_batch(&self) -> RecordBatch { @@ -159,6 +166,7 @@ impl ProjectionMapper { } /// Handles projection and converts a projected [Batch] to a projected [RecordBatch]. +#[derive(Clone)] pub struct PrimaryKeyProjectionMapper { /// Metadata of the region. metadata: RegionMetadataRef, diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 894b0bb774..2180a49c55 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -15,6 +15,7 @@ //! Structs for partition ranges. use common_time::Timestamp; +use datatypes::arrow::datatypes::SchemaRef; use smallvec::{SmallVec, smallvec}; use store_api::region_engine::PartitionRange; use store_api::storage::TimeSeriesDistribution; @@ -477,6 +478,11 @@ impl MemRangeBuilder { pub(crate) fn stats(&self) -> &MemtableStats { &self.stats } + + /// Returns the record batch schema for this memtable range if available. + pub(crate) fn record_batch_schema(&self) -> Option { + self.range.record_batch_schema() + } } #[cfg(test)] diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 4f4ec28d8d..9fb120e197 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,6 +14,7 @@ //! Scans a region according to the scan request. +use std::borrow::Cow; use std::collections::HashSet; use std::fmt; use std::num::NonZeroU64; @@ -30,6 +31,10 @@ use common_time::range::TimestampRange; use datafusion_common::Column; use datafusion_expr::Expr; use datafusion_expr::utils::expr_to_columns; +use datatypes::data_type::ConcreteDataType; +use datatypes::extension::json::is_json_extension_type; +use datatypes::schema::Schema; +use datatypes::types::json_type; use futures::StreamExt; use partition::expr::PartitionExpr; use smallvec::SmallVec; @@ -566,6 +571,8 @@ impl ScanRegion { } else { input }; + + let input = maybe_concretize_flat_json2_schema(input); Ok(input) } @@ -792,6 +799,59 @@ impl ScanRegion { } } +fn maybe_concretize_flat_json2_schema(input: ScanInput) -> ScanInput { + let Some(flat_mapper) = input.mapper.as_flat() else { + return input; + }; + let output_schema = flat_mapper.output_schema(); + let output_arrow_schema = output_schema.arrow_schema(); + + let mem_schemas: Vec<_> = input + .memtables + .iter() + .filter_map(|mem| mem.record_batch_schema()) + .collect(); + if mem_schemas.is_empty() { + return input; + } + + let mut column_schemas = output_schema.column_schemas().to_vec(); + let mut changed = false; + for (idx, column_schema) in column_schemas.iter_mut().enumerate() { + let output_field = &output_arrow_schema.fields()[idx]; + if !is_json_extension_type(output_field) { + continue; + } + + let merged = mem_schemas + .iter() + .filter_map(|x| { + x.column_with_name(&column_schema.name) + .map(|(_, f)| Cow::Borrowed(f.data_type())) + }) + .reduce(|acc, e| { + Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned()) + }); + if let Some(merged) = merged + && merged.as_ref() != output_field.data_type() + { + column_schema.data_type = ConcreteDataType::from_arrow_type(merged.as_ref()); + changed = true; + } + } + + if changed { + let mut mapper = Arc::unwrap_or_clone(input.mapper); + mapper.with_flat_output_schema(Arc::new(Schema::new(column_schemas))); + ScanInput { + mapper: Arc::new(mapper), + ..input + } + } else { + input + } +} + /// Returns true if the time range of a SST `file` matches the `predicate`. fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { if predicate == &TimestampRange::min_to_max() { diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index d6b061e468..6c7adfa388 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -87,6 +87,7 @@ impl FlatWriteFormat { } /// Gets the arrow schema to store in parquet. + #[cfg(test)] pub(crate) fn arrow_schema(&self) -> &SchemaRef { &self.arrow_schema } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 460b18f3a3..595d21fe68 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -31,6 +31,7 @@ use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; +use datatypes::schema::ext::ArrowSchemaExt; use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; @@ -412,7 +413,11 @@ impl ParquetReaderBuilder { let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); // Computes the field levels. - let hint = Some(read_format.arrow_schema().fields()); + let hint = if read_format.arrow_schema().has_json_extension_field() { + None + } else { + Some(read_format.arrow_schema().fields()) + }; let field_levels = parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadDataPartSnafu)?; diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index b207f11ef8..ab238fc838 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -450,7 +450,7 @@ where let arrow_batch = flat_format.convert_batch(&record_batch)?; let start = Instant::now(); - self.maybe_init_writer(flat_format.arrow_schema(), opts) + self.maybe_init_writer(arrow_batch.schema_ref(), opts) .await? .write(&arrow_batch) .await diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index e2e0969035..83c126b9b4 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -301,12 +301,22 @@ impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> { .or_insert_with(|| value_type.clone()); if !merged_type.is_include(&value_type) { - merged_type.merge(&value_type).map_err(|e| { + if column_schema + .data_type + .as_json() + .map(|x| x.is_native_type()) + .unwrap_or(false) + { + merged_type.merge(&value_type) + } else { + merged_type.merge_with_lifting(&value_type) + } + .map_err(|e| { InvalidInsertRequestSnafu { reason: format!(r#"cannot merge "{value_type}" into "{merged_type}": {e}"#), } .build() - })?; + })? } } Ok(()) @@ -323,7 +333,17 @@ impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> { for (column_name, merged_type) in self.merged_value_types.iter() { let Some(column_type) = insert_columns .iter() - .find_map(|x| (&x.name == column_name).then(|| x.data_type.as_json())) + .find_map(|x| { + (&x.name == column_name).then(|| { + if let ConcreteDataType::Json(t) = &x.data_type + && t.is_native_type() + { + Some(t) + } else { + None + } + }) + }) .flatten() else { continue; diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 863dd9c0f9..eac32679d2 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -15,6 +15,7 @@ //! Planner, QueryEngine implementations based on DataFusion. mod error; +mod json2_expr_planner; mod planner; use std::any::Any; diff --git a/src/query/src/datafusion/json2_expr_planner.rs b/src/query/src/datafusion/json2_expr_planner.rs new file mode 100644 index 0000000000..bdf864698b --- /dev/null +++ b/src/query/src/datafusion/json2_expr_planner.rs @@ -0,0 +1,54 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_schema::Field; +use arrow_schema::extension::ExtensionType; +use common_function::scalars::json::json2_get::Json2GetFunction; +use common_function::scalars::udf::create_udf; +use datafusion_common::{Column, Result, ScalarValue, TableReference}; +use datafusion_expr::Expr; +use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::planner::{ExprPlanner, PlannerResult}; +use datatypes::extension::json::JsonExtensionType; + +#[derive(Debug)] +pub(crate) struct Json2ExprPlanner; + +fn json2_get(base: Expr, path: String) -> Expr { + let args = vec![base, Expr::Literal(ScalarValue::Utf8(Some(path)), None)]; + let function = create_udf(Arc::new(Json2GetFunction::default())); + Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(function), args)) +} + +impl ExprPlanner for Json2ExprPlanner { + fn plan_compound_identifier( + &self, + field: &Field, + qualifier: Option<&TableReference>, + nested_names: &[String], + ) -> Result>> { + if field.extension_type_name() != Some(JsonExtensionType::NAME) { + return Ok(PlannerResult::Original(Vec::new())); + } + + let path = nested_names.join("."); + let column = Column::from((qualifier, field)); + Ok(PlannerResult::Planned(json2_get( + Expr::Column(column), + path, + ))) + } +} diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index d9c74b9d5a..7088111774 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -38,6 +38,7 @@ use datafusion_sql::parser::Statement as DfStatement; use session::context::QueryContextRef; use snafu::{Location, ResultExt}; +use crate::datafusion::json2_expr_planner::Json2ExprPlanner; use crate::error::{CatalogSnafu, Result}; use crate::query_engine::{DefaultPlanDecoder, QueryEngineState}; @@ -87,6 +88,9 @@ impl DfContextProviderAdapter { .map(|format| (format.get_ext().to_lowercase(), format)) .collect(); + let mut expr_planners = SessionStateDefaults::default_expr_planners(); + expr_planners.insert(0, Arc::new(Json2ExprPlanner)); + Ok(Self { engine_state, session_state, @@ -94,7 +98,7 @@ impl DfContextProviderAdapter { table_provider, query_ctx, file_formats, - expr_planners: SessionStateDefaults::default_expr_planners(), + expr_planners, }) } } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 211fc5598e..642479ca64 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -153,7 +153,16 @@ pub fn column_to_schema( column_schema.set_inverted_index(column.extensions.inverted_index_options.is_some()); - if matches!(column.data_type(), SqlDataType::JSON) { + let is_json2_column = if let SqlDataType::Custom(object_name, _) = column.data_type() { + object_name + .0 + .first() + .map(|x| x.to_string_unquoted().eq_ignore_ascii_case("JSON2")) + .unwrap_or_default() + } else { + false + }; + if is_json2_column || matches!(column.data_type(), SqlDataType::JSON) { let settings = column .extensions .build_json_structure_settings()? @@ -290,22 +299,25 @@ pub fn sql_data_type_to_concrete_data_type( }; Ok(ConcreteDataType::Json(JsonType::new(format))) } - // Vector type - SqlDataType::Custom(name, d) - if name.0.as_slice().len() == 1 - && name.0.as_slice()[0] - .to_string_unquoted() - .to_ascii_uppercase() - == VECTOR_TYPE_NAME - && d.len() == 1 => - { - let dim = d[0].parse().map_err(|e| { - error::ParseSqlValueSnafu { - msg: format!("Failed to parse vector dimension: {}", e), + // Vector type and JSON2 type + SqlDataType::Custom(name, d) if name.0.len() == 1 => { + let name = name.0[0].to_string_unquoted().to_ascii_uppercase(); + match name.as_str() { + VECTOR_TYPE_NAME if d.len() == 1 => { + let dim = d[0].parse().map_err(|e| { + error::ParseSqlValueSnafu { + msg: format!(r#"Failed to parse vector dimension "{}": {}"#, d[0], e), + } + .build() + })?; + Ok(ConcreteDataType::vector_datatype(dim)) } - .build() - })?; - Ok(ConcreteDataType::vector_datatype(dim)) + "JSON2" => Ok(ConcreteDataType::Json(JsonType::new(JsonFormat::Json2))), + _ => error::SqlTypeNotSupportedSnafu { + t: data_type.clone(), + } + .fail(), + } } _ => error::SqlTypeNotSupportedSnafu { t: data_type.clone(), diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 817b31518d..e7fa9d7114 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -377,32 +377,35 @@ impl ColumnExtensions { None }; - options + let format = options .get(JSON_OPT_FORMAT) - .map(|format| match format { - JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(fields)), - JSON_FORMAT_PARTIAL => { - let fields = fields.map(|fields| { - let mut fields = Arc::unwrap_or_clone(fields.fields()); - fields.push(datatypes::types::StructField::new( - JsonStructureSettings::RAW_FIELD.to_string(), - ConcreteDataType::string_datatype(), - true, - )); - StructType::new(Arc::new(fields)) - }); - Ok(JsonStructureSettings::PartialUnstructuredByKey { - fields, - unstructured_keys, - }) + .unwrap_or(JSON_FORMAT_FULL_STRUCTURED); + let settings = match format { + JSON_FORMAT_FULL_STRUCTURED => JsonStructureSettings::Structured(fields), + JSON_FORMAT_PARTIAL => { + let fields = fields.map(|fields| { + let mut fields = Arc::unwrap_or_clone(fields.fields()); + fields.push(datatypes::types::StructField::new( + JsonStructureSettings::RAW_FIELD.to_string(), + ConcreteDataType::string_datatype(), + true, + )); + StructType::new(Arc::new(fields)) + }); + JsonStructureSettings::PartialUnstructuredByKey { + fields, + unstructured_keys, } - JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw), - _ => InvalidSqlSnafu { + } + JSON_FORMAT_RAW => JsonStructureSettings::UnstructuredRaw, + _ => { + return InvalidSqlSnafu { msg: format!("unknown JSON datatype 'format': {format}"), } - .fail(), - }) - .transpose() + .fail(); + } + }; + Ok(Some(settings)) } pub fn set_json_structure_settings(&mut self, settings: JsonStructureSettings) { diff --git a/tests/cases/standalone/common/types/json/json-structured.result b/tests/cases/standalone/common/types/json/json-structured.result deleted file mode 100644 index be04e2652d..0000000000 --- a/tests/cases/standalone/common/types/json/json-structured.result +++ /dev/null @@ -1,82 +0,0 @@ -CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured") DEFAULT '{"foo": "bar"}'); - -Error: 1001(Unsupported), Unsupported default constraint for column: 'j', reason: json column cannot have a default value - -CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured")); - -Affected Rows: 0 - -DESC TABLE t; - -+--------+----------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------+----------------------+-----+------+---------+---------------+ -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -| j | Json<""> | | YES | | FIELD | -+--------+----------------------+-----+------+---------+---------------+ - -INSERT INTO t VALUES -(1762128001000, '{"int": 1}'), -(1762128002000, '{"int": 2, "list": [0.1, 0.2, 0.3]}'), -(1762128003000, '{"int": 3, "list": [0.4, 0.5, 0.6], "nested": {"a": {"x": "hello"}, "b": {"y": -1}}}'); - -Affected Rows: 3 - -DESC TABLE t; - -+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -| j | Json<{"int":"","list":[""],"nested":{"a":{"x":""},"b":{"y":""}}}> | | YES | | FIELD | -+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ - -INSERT INTO t VALUES -(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'), -(1762128005000, '{"int": 5, "bool": false, "nested": {"b": {"x": "world"}}}'); - -Affected Rows: 2 - -DESC TABLE t; - -+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -| j | Json<{"bool":"","int":"","list":[""],"nested":{"a":{"x":"","y":""},"b":{"x":"","y":""}}}> | | YES | | FIELD | -+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ - -INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}'); - -Affected Rows: 1 - -DESC TABLE t; - -+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -| j | Json<{"bool":"","int":"","list":[""],"nested":{"a":{"x":"","y":""},"b":{"x":"","y":""}}}> | | YES | | FIELD | -+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ - -INSERT INTO t VALUES (1762128011000, '{}'); - -Error: 1004(InvalidArguments), Invalid InsertRequest, reason: empty json object is not supported, consider adding a dummy field - -SELECT ts, j FROM t order by ts; - -+---------------------+----------------------------------------------------------------------------------------+ -| ts | j | -+---------------------+----------------------------------------------------------------------------------------+ -| 2025-11-03T00:00:01 | {bool: , int: 1, list: , nested: } | -| 2025-11-03T00:00:02 | {bool: , int: 2, list: [0.1, 0.2, 0.3], nested: } | -| 2025-11-03T00:00:03 | {bool: , int: 3, list: [0.4, 0.5, 0.6], nested: {a: {x: hello, y: }, b: {x: , y: -1}}} | -| 2025-11-03T00:00:04 | {bool: true, int: 4, list: , nested: {a: {x: , y: 1}, b: }} | -| 2025-11-03T00:00:05 | {bool: false, int: 5, list: , nested: {a: , b: {x: world, y: }}} | -| 2025-11-03T00:00:06 | {bool: true, int: 6, list: [-6.0], nested: {a: {x: ax, y: 66}, b: {x: bx, y: -66}}} | -+---------------------+----------------------------------------------------------------------------------------+ - -DROP table t; - -Affected Rows: 0 - diff --git a/tests/cases/standalone/common/types/json/json-structured.sql b/tests/cases/standalone/common/types/json/json-structured.sql deleted file mode 100644 index 8bb10b4b0e..0000000000 --- a/tests/cases/standalone/common/types/json/json-structured.sql +++ /dev/null @@ -1,28 +0,0 @@ -CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured") DEFAULT '{"foo": "bar"}'); - -CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured")); - -DESC TABLE t; - -INSERT INTO t VALUES -(1762128001000, '{"int": 1}'), -(1762128002000, '{"int": 2, "list": [0.1, 0.2, 0.3]}'), -(1762128003000, '{"int": 3, "list": [0.4, 0.5, 0.6], "nested": {"a": {"x": "hello"}, "b": {"y": -1}}}'); - -DESC TABLE t; - -INSERT INTO t VALUES -(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'), -(1762128005000, '{"int": 5, "bool": false, "nested": {"b": {"x": "world"}}}'); - -DESC TABLE t; - -INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}'); - -DESC TABLE t; - -INSERT INTO t VALUES (1762128011000, '{}'); - -SELECT ts, j FROM t order by ts; - -DROP table t; diff --git a/tests/cases/standalone/common/types/json/json2.result b/tests/cases/standalone/common/types/json/json2.result new file mode 100644 index 0000000000..f0644cbcab --- /dev/null +++ b/tests/cases/standalone/common/types/json/json2.result @@ -0,0 +1,129 @@ +create table json2_table +( + ts timestamp time index, + j json2 +) with ( + 'append_mode' = 'true', + 'sst_format' = 'flat', +); + +Affected Rows: 0 + +insert into json2_table (ts, j) +values (1, '{"a": {"b": 1}, "c": "s1"}'), + (2, '{"a": {"b": 2}, "c": "s2"}'); + +Affected Rows: 2 + +admin flush_table('json2_table'); + ++----------------------------------+ +| ADMIN flush_table('json2_table') | ++----------------------------------+ +| 0 | ++----------------------------------+ + +insert into json2_table (ts, j) +values (3, '{"a": {"b": 3}, "c": "s3"}'); + +Affected Rows: 1 + +insert into json2_table +values (4, '{"a": {"b": 4}}'), + (5, '{"a": {}, "c": "s5"}'), + (6, '{"c": "s6"}'); + +Affected Rows: 3 + +admin flush_table('json2_table'); + ++----------------------------------+ +| ADMIN flush_table('json2_table') | ++----------------------------------+ +| 0 | ++----------------------------------+ + +insert into json2_table +values (7, '{"a": {"b": "s7"}, "c": [1]}'), + (8, '{"a": {"b": 8}, "c": "s8"}'); + +Affected Rows: 2 + +insert into json2_table +values (9, '{"a": {"x": true}, "c": "s9"}'), + (10, '{"a": {"b": 10}, "y": false}'); + +Affected Rows: 2 + +select j.a.b from json2_table order by ts; + ++--------------------------------------+ +| json2_get(json2_table.j,Utf8("a.b")) | ++--------------------------------------+ +| 1 | +| 2 | +| 3 | +| 4 | +| | +| | +| s7 | +| 8 | +| | +| 10 | ++--------------------------------------+ + +select j.a.x from json2_table order by ts; + ++--------------------------------------+ +| json2_get(json2_table.j,Utf8("a.x")) | ++--------------------------------------+ +| | +| | +| | +| | +| | +| | +| | +| | +| true | +| | ++--------------------------------------+ + +select j.c from json2_table order by ts; + ++------------------------------------+ +| json2_get(json2_table.j,Utf8("c")) | ++------------------------------------+ +| s1 | +| s2 | +| s3 | +| | +| s5 | +| s6 | +| [1] | +| s8 | +| s9 | +| | ++------------------------------------+ + +select j.y from json2_table order by ts; + ++------------------------------------+ +| json2_get(json2_table.j,Utf8("y")) | ++------------------------------------+ +| | +| | +| | +| | +| | +| | +| | +| | +| | +| false | ++------------------------------------+ + +drop table json2_table; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/types/json/json2.sql b/tests/cases/standalone/common/types/json/json2.sql new file mode 100644 index 0000000000..a6ea4eb32d --- /dev/null +++ b/tests/cases/standalone/common/types/json/json2.sql @@ -0,0 +1,42 @@ +create table json2_table +( + ts timestamp time index, + j json2 +) with ( + 'append_mode' = 'true', + 'sst_format' = 'flat', +); + +insert into json2_table (ts, j) +values (1, '{"a": {"b": 1}, "c": "s1"}'), + (2, '{"a": {"b": 2}, "c": "s2"}'); + +admin flush_table('json2_table'); + +insert into json2_table (ts, j) +values (3, '{"a": {"b": 3}, "c": "s3"}'); + +insert into json2_table +values (4, '{"a": {"b": 4}}'), + (5, '{"a": {}, "c": "s5"}'), + (6, '{"c": "s6"}'); + +admin flush_table('json2_table'); + +insert into json2_table +values (7, '{"a": {"b": "s7"}, "c": [1]}'), + (8, '{"a": {"b": 8}, "c": "s8"}'); + +insert into json2_table +values (9, '{"a": {"x": true}, "c": "s9"}'), + (10, '{"a": {"b": 10}, "y": false}'); + +select j.a.b from json2_table order by ts; + +select j.a.x from json2_table order by ts; + +select j.c from json2_table order by ts; + +select j.y from json2_table order by ts; + +drop table json2_table;