From 2d430091f4798d9c11b793ee44434c30175dff50 Mon Sep 17 00:00:00 2001 From: luofucong Date: Thu, 21 May 2026 19:54:00 +0800 Subject: [PATCH] json2 (?) data-driven concretize (?) select (?) compaction Signed-off-by: luofucong --- src/common/recordbatch/src/recordbatch.rs | 2 +- src/datanode/src/lib.rs | 2 + src/datatypes/src/json.rs | 120 ++++++++++++------ src/datatypes/src/json/value.rs | 36 ++++-- src/datatypes/src/types/json_type.rs | 31 ++++- src/datatypes/src/vectors/json/array.rs | 35 ++++- src/mito2/src/flush.rs | 12 +- src/mito2/src/memtable.rs | 65 ++++++++++ src/mito2/src/memtable/bulk.rs | 8 ++ src/mito2/src/memtable/bulk/part.rs | 7 +- src/mito2/src/read/flat_merge.rs | 31 ++++- src/mito2/src/read/flat_projection.rs | 1 + src/mito2/src/read/range.rs | 7 + src/mito2/src/sst/parquet/flat_format.rs | 6 + tests-integration/tests/jsonbench.rs | 73 ++++------- .../standalone/common/types/json/json2.result | 55 ++++++++ .../standalone/common/types/json/json2.sql | 13 ++ 17 files changed, 390 insertions(+), 114 deletions(-) diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 4289714afd..413d89fb78 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -355,7 +355,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul Ok(RecordBatch::from_df_record_batch(schema, record_batch)) } -fn maybe_align_json_array_with_schema( +pub fn maybe_align_json_array_with_schema( schema: &ArrowSchemaRef, arrays: Vec, ) -> Result> { diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 7e0db3cabc..f1c33a4d3b 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![recursion_limit = "256"] + pub mod alive_keeper; pub mod config; pub mod datanode; diff --git a/src/datatypes/src/json.rs b/src/datatypes/src/json.rs index db657abbcb..1972601695 100644 --- a/src/datatypes/src/json.rs +++ b/src/datatypes/src/json.rs @@ -26,12 +26,12 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value as Json}; -use snafu::{OptionExt, ResultExt, ensure}; +use snafu::{OptionExt, ResultExt}; use crate::error::{self, InvalidJsonSnafu, Result, SerializeSnafu}; use crate::json::value::{JsonValue, JsonVariant}; use crate::types::json_type::{JsonNativeType, JsonNumberType, JsonObjectType}; -use crate::types::{StructField, StructType}; +use crate::types::{JsonType, StructField, StructType}; use crate::value::{ListValue, StructValue, Value}; /// The configuration of JSON encoding @@ -305,32 +305,45 @@ fn encode_json_array_with_context<'a>( ) -> Result { let json_array_len = json_array.len(); let mut items = Vec::with_capacity(json_array_len); - let mut element_type = item_type.cloned(); for (index, value) in json_array.into_iter().enumerate() { let array_context = context.with_key(&index.to_string()); - let item_value = - encode_json_value_with_context(value, element_type.as_ref(), &array_context)?; - let item_type = item_value.json_type().native_type().clone(); - items.push(item_value.into_variant()); + let item_value = encode_json_value_with_context(value, None, &array_context)?; + items.push(item_value); + } - // Determine the common type for the list - if let Some(current_type) = &element_type { - // It's valid for json array to have different types of items, for example, - // ["a string", 1]. However, the `JsonValue` will be converted to Arrow list array, - // which requires all items have exactly same type. So we forbid the different types - // case here. Besides, it's not common for items in a json array to differ. So I think - // we are good here. - ensure!( - item_type == *current_type, - error::InvalidJsonSnafu { - value: "all items in json array must have the same type" - } - ); - } else { - element_type = Some(item_type); + // In specification, it's valid for a JSON array to have different types of items, for example, + // ["a string", 1]. However, in implementation, the `JsonValue` will be converted to Arrow list + // array, which requires all items have exactly the same type. So we merge out the maybe + // different item types to a unified type, and align all the item values to it. + + let provided_item_type = item_type.map(|x| JsonType::new_json2(x.clone())); + let merged_item_type = if let Some((first, rests)) = items.split_first() { + let mut merged = first.json_type().clone(); + for rest in rests.iter().map(|x| x.json_type()) { + merged.merge(rest)?; + } + Some(merged) + } else { + None + }; + let unified_item_type = match (provided_item_type, merged_item_type) { + (Some(mut x), Some(y)) => { + x.merge(&y)?; + Some(x) + } + (Some(x), None) | (None, Some(x)) => Some(x), + (None, None) => None, + }; + if let Some(unified_item_type) = unified_item_type { + for item in &mut items { + item.try_align(&unified_item_type)?; } } + let items = items + .into_iter() + .map(|x| x.into_variant()) + .collect::>(); Ok(JsonValue::new(JsonVariant::Array(items))) } @@ -729,7 +742,7 @@ where #[cfg(test)] mod tests { - + use common_base::bytes::Bytes; use serde_json::json; use super::*; @@ -1050,11 +1063,21 @@ mod tests { fn test_encode_json_array_mixed_types() { let json = json!([1, "hello", true, 3.15]); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None); - assert_eq!( - result.unwrap_err().to_string(), - "Invalid JSON: all items in json array must have the same type" - ); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); + + if let Value::List(list_value) = result { + assert_eq!(list_value.items().len(), 4); + assert_eq!( + list_value.datatype(), + Arc::new(ConcreteDataType::binary_datatype()) + ); + } else { + panic!("Expected List value"); + } } #[test] @@ -1276,12 +1299,12 @@ mod tests { #[test] fn test_encode_json_array_with_item_type() { let json = json!([1, 2, 3]); - let item_type = Arc::new(ConcreteDataType::uint64_datatype()); + let item_type = Arc::new(ConcreteDataType::int64_datatype()); let settings = JsonStructureSettings::Structured(None); let result = settings .encode_with_type( json, - Some(&JsonNativeType::Array(Box::new(JsonNativeType::u64()))), + Some(&JsonNativeType::Array(Box::new(JsonNativeType::i64()))), ) .unwrap() .into_json_inner() @@ -1289,9 +1312,9 @@ mod tests { if let Value::List(list_value) = result { assert_eq!(list_value.items().len(), 3); - assert_eq!(list_value.items()[0], Value::UInt64(1)); - assert_eq!(list_value.items()[1], Value::UInt64(2)); - assert_eq!(list_value.items()[2], Value::UInt64(3)); + assert_eq!(list_value.items()[0], Value::Int64(1)); + assert_eq!(list_value.items()[1], Value::Int64(2)); + assert_eq!(list_value.items()[2], Value::Int64(3)); assert_eq!(list_value.datatype(), item_type); } else { panic!("Expected List value"); @@ -2249,11 +2272,32 @@ mod tests { )])), ); - let decoded_struct = settings.decode_struct(array_struct); - assert_eq!( - decoded_struct.unwrap_err().to_string(), - "Invalid JSON: all items in json array must have the same type" - ); + let decoded_struct = settings.decode_struct(array_struct).unwrap(); + let fields = decoded_struct.struct_type().fields(); + let decoded_fields: Vec<&str> = fields.iter().map(|f| f.name()).collect(); + assert!(decoded_fields.contains(&"value")); + + if let Value::List(list_value) = &decoded_struct.items()[0] { + assert_eq!(list_value.items().len(), 4); + assert_eq!( + list_value.items()[0], + Value::Binary(Bytes::from("1".as_bytes())) + ); + assert_eq!( + list_value.items()[1], + Value::Binary(Bytes::from(r#""hello""#.as_bytes())) + ); + assert_eq!( + list_value.items()[2], + Value::Binary(Bytes::from("true".as_bytes())) + ); + assert_eq!( + list_value.items()[3], + Value::Binary(Bytes::from("3.15".as_bytes())) + ); + } else { + panic!("Expected array to be decoded as ListValue"); + } } #[test] diff --git a/src/datatypes/src/json/value.rs b/src/datatypes/src/json/value.rs index f3b652a549..b99f7ca379 100644 --- a/src/datatypes/src/json/value.rs +++ b/src/datatypes/src/json/value.rs @@ -161,12 +161,18 @@ impl JsonVariant { }; JsonNativeType::Array(Box::new(item_type)) } - JsonVariant::Object(object) => JsonNativeType::Object( - object - .iter() - .map(|(k, v)| (k.clone(), v.native_type())) - .collect(), - ), + JsonVariant::Object(object) => { + if object.is_empty() { + JsonNativeType::Null + } else { + JsonNativeType::Object( + object + .iter() + .map(|(k, v)| (k.clone(), v.native_type())) + .collect(), + ) + } + } JsonVariant::Variant(_) => JsonNativeType::Variant, } } @@ -642,12 +648,18 @@ impl JsonVariantRef<'_> { }; JsonNativeType::Array(Box::new(item_type)) } - JsonVariantRef::Object(object) => JsonNativeType::Object( - object - .iter() - .map(|(k, v)| (k.to_string(), native_type(v))) - .collect(), - ), + JsonVariantRef::Object(object) => { + if object.is_empty() { + JsonNativeType::Null + } else { + JsonNativeType::Object( + object + .iter() + .map(|(k, v)| (k.to_string(), native_type(v))) + .collect(), + ) + } + } JsonVariantRef::Variant(_) => JsonNativeType::Variant, } } diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index 362357c5e6..29ac015673 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::collections::BTreeMap; use std::fmt::{Debug, Display, Formatter}; use std::str::FromStr; @@ -115,6 +116,14 @@ impl JsonNativeType { (JsonNativeType::Null, that) => that.clone(), (this, JsonNativeType::Null) => this, (this, that) if this == *that => this, + + // (JsonNativeType::Number(x), JsonNativeType::Number(y)) => { + // JsonNativeType::Number(match (x, y) { + // (x, y) if x == y => *x, + // (JsonNumberType::F64, _) | (_, JsonNumberType::F64) => JsonNumberType::F64, + // _ => JsonNumberType::I64, + // }) + // } _ => JsonNativeType::Variant, }; } @@ -380,6 +389,23 @@ fn plain_json_struct_type(data_type: &ArrowDataType) -> StructType { StructType::new(Arc::new(vec![field])) } +pub fn merge_as_json_type<'a>( + left: &'a ArrowDataType, + right: &ArrowDataType, +) -> Cow<'a, ArrowDataType> { + if left == right { + return Cow::Borrowed(left); + } + + let mut left = JsonType::from(left); + let right = JsonType::from(right); + Cow::Owned(if left.merge(&right).is_ok() { + left.as_arrow_type() + } else { + ArrowDataType::Utf8 + }) +} + impl From<&ArrowDataType> for JsonType { fn from(t: &ArrowDataType) -> Self { JsonType::new_json2(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t))) @@ -811,18 +837,15 @@ mod tests { Ok(r#""""#), )?; - // Identical number categories should stay as Number. test( "1", &mut JsonType::new_json2(JsonNativeType::i64()), Ok(r#""""#), )?; - - // Conflicting number categories should be lifted to Variant. test( "1.5", &mut JsonType::new_json2(JsonNativeType::i64()), - Ok(r#""""#), + Ok(r#""""#), )?; // Object merge should preserve existing fields and append missing fields. diff --git a/src/datatypes/src/vectors/json/array.rs b/src/datatypes/src/vectors/json/array.rs index 75779821c5..10ed121790 100644 --- a/src/datatypes/src/vectors/json/array.rs +++ b/src/datatypes/src/vectors/json/array.rs @@ -24,9 +24,10 @@ use arrow_schema::{DataType, FieldRef}; use serde_json::Value; use snafu::{OptionExt, ResultExt}; -use crate::arrow_array::{StringArray, binary_array_value, string_array_value}; +use crate::arrow_array::{MutableBinaryArray, StringArray, binary_array_value, string_array_value}; use crate::error::{ AlignJsonArraySnafu, ArrowComputeSnafu, DeserializeSnafu, InvalidJsonSnafu, Result, + SerializeSnafu, }; pub struct JsonArray<'a> { @@ -177,7 +178,39 @@ impl JsonArray<'_> { Ok(Arc::new(json_array)) } + fn try_decode_variant(&self) -> Result { + let json_values = (0..self.inner.len()) + .map(|i| self.try_get_value(i)) + .collect::>>()?; + + let serialized_values = json_values + .iter() + .map(|value| { + (!value.is_null()) + .then(|| serde_json::to_vec(value)) + .transpose() + }) + .collect::, _>>() + .context(SerializeSnafu)?; + let total_bytes = serialized_values.iter().flatten().map(Vec::len).sum(); + + let mut builder = MutableBinaryArray::with_capacity(self.inner.len(), total_bytes); + for serialized_value in serialized_values { + if let Some(bytes) = serialized_value { + builder.append_value(bytes); + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.finish())) + } + fn try_cast(&self, to_type: &DataType) -> Result { + if matches!(to_type, DataType::Binary) { + return self.try_decode_variant(); + } + if compute::can_cast_types(self.inner.data_type(), to_type) { return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 16260327f5..a4cd1cebd1 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -567,14 +567,16 @@ impl RegionFlushTask { write_opts: &WriteOptions, mem_ranges: MemtableRanges, ) -> Result { - let batch_schema = to_flat_sst_arrow_schema( - &version.metadata, - &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding), - ); + // let memtable_schema = mem_ranges + // .schema() + // .unwrap_or_else(|| version.metadata.schema.arrow_schema().clone()); + + let options = FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding); + let batch_schema = to_flat_sst_arrow_schema(&version.metadata, &options); let field_column_start = flat_format::field_column_start(&version.metadata, batch_schema.fields().len()); let flat_sources = memtable_flat_sources( - batch_schema, + batch_schema.clone(), mem_ranges, &version.options, field_column_start, diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 879bc88f10..07ed5e8c13 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -14,6 +14,7 @@ //! Memtables are write buffers for regions. +use std::borrow::Cow; use std::collections::BTreeMap; use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -60,6 +61,10 @@ pub use bulk::part::{ BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size, sort_primary_key_record_batch, }; +use datatypes::arrow::datatypes::{Schema, SchemaRef}; +use datatypes::extension::json; +use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::types::json_type; #[cfg(any(test, feature = "test"))] pub use time_partition::filter_record_batch; @@ -217,6 +222,56 @@ impl MemtableRanges { .max() .unwrap_or(0) } + + #[expect(unused)] + pub(crate) fn schema(&self) -> Option { + let mut schemas = self + .ranges + .values() + .filter_map(|x| x.record_batch_schema()) + .collect::>(); + + if schemas.iter().all(|x| !x.has_json_extension_field()) { + // If there are no JSON extension fields in any schemas, the invariant must be hold, + // that all schemas are same (they are all derived from same region metadata). + // So it's ok to return the first one as the schema of the whole memtable ranges. + return (!schemas.is_empty()).then(|| schemas.swap_remove(0)); + } + + // If there are JSON extension fields, by convention, only their concrete data types + // (Arrow's Struct) may differ. Other things like the metadata or the fields count are same. + // So to produce the final schema, we can solely merge the data types. + schemas + .split_first() + .map(|(first, rest)| merge_json_extension_fields(first, rest)) + } +} + +pub(crate) fn merge_json_extension_fields(base: &SchemaRef, others: &[SchemaRef]) -> SchemaRef { + let mut fields = base.fields().iter().cloned().collect::>(); + for (i, field) in fields.iter_mut().enumerate() { + if !json::is_json_extension_type(field) { + continue; + } + + let merged = others + .iter() + .map(|x| Cow::Borrowed(x.field(i).data_type())) + .reduce(|acc, e| { + Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned()) + }); + if let Some(merged) = merged + && field.data_type() != merged.as_ref() + { + let merged = + json_type::merge_as_json_type(field.data_type(), merged.as_ref()).into_owned(); + + let mut new = field.as_ref().clone(); + new.set_data_type(merged); + *field = Arc::new(new); + } + } + Arc::new(Schema::new_with_metadata(fields, base.metadata().clone())) } impl IterBuilder for MemtableRanges { @@ -557,6 +612,11 @@ pub trait IterBuilder: Send + Sync { .fail() } + /// Returns the schema of record batches produced by this iterator. + fn record_batch_schema(&self) -> Option { + None + } + /// Returns the [EncodedRange] if the range is already encoded into SST. fn encoded_range(&self) -> Option { None @@ -734,6 +794,11 @@ impl MemtableRange { self.context.builder.is_record_batch() } + /// Returns the schema of record batches if this range supports record batch iteration. + pub fn record_batch_schema(&self) -> Option { + self.context.builder.record_batch_schema() + } + pub fn num_rows(&self) -> usize { self.stats.num_rows } diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 04e0a5e3da..78631b2090 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -835,6 +835,10 @@ impl IterBuilder for BulkRangeIterBuilder { fn encoded_range(&self) -> Option { None } + + fn record_batch_schema(&self) -> Option { + Some(self.part.batch.schema()) + } } impl IterBuilder for MultiBulkRangeIterBuilder { @@ -867,6 +871,10 @@ impl IterBuilder for MultiBulkRangeIterBuilder { fn encoded_range(&self) -> Option { None } + + fn record_batch_schema(&self) -> Option { + self.part.record_batch_schema() + } } /// Iterator builder for encoded bulk range diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index e094b662f9..c221ffcebf 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -433,7 +433,7 @@ impl UnorderedPart { return Ok(Some(self.parts[0].batch.clone())); } - // Get the schema from the first part + // Get the schema from the first part and normalize JSON2 columns across all parts. let schema = self.parts[0].batch.schema(); let concatenated = if schema.has_json_extension_field() { let (schema, batches) = align_parts(&self.parts)?; @@ -1608,6 +1608,11 @@ impl MultiBulkPart { self.series_count } + /// Returns the schema of batches in this part. + pub(crate) fn record_batch_schema(&self) -> Option { + self.batches.first().map(|batch| batch.schema()) + } + /// Returns the number of record batches in this part. pub fn num_batches(&self) -> usize { self.batches.len() diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs index b1c304f244..d23ad68960 100644 --- a/src/mito2/src/read/flat_merge.rs +++ b/src/mito2/src/read/flat_merge.rs @@ -26,12 +26,14 @@ use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaR use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::arrow_array::BinaryArray; +use datatypes::extension::json::is_json_extension_type; use datatypes::timestamp::timestamp_array_to_primitive; +use datatypes::vectors::json::array::JsonArray; use futures::{Stream, TryStreamExt}; use snafu::ResultExt; use store_api::storage::SequenceNumber; -use crate::error::{ComputeArrowSnafu, Result}; +use crate::error::{ComputeArrowSnafu, DataTypeMismatchSnafu, Result}; use crate::memtable::BoxedRecordBatchIterator; use crate::metrics::READ_STAGE_ELAPSED; use crate::read::BoxedRecordBatchStream; @@ -258,14 +260,29 @@ impl BatchBuilder { check_interleave_overflow(&self.batches, &self.schema, &self.indices)?; - let columns = (0..self.schema.fields.len()) - .map(|column_idx| { - let arrays: Vec<_> = self + let columns = self + .schema + .fields() + .iter() + .enumerate() + .map(|(column_idx, field)| { + let arrays = self .batches .iter() - .map(|(_, batch)| batch.column(column_idx).as_ref()) - .collect(); - interleave(&arrays, &self.indices).context(ComputeArrowSnafu) + .map(|(_, batch)| { + let column = batch.column(column_idx); + let column = if is_json_extension_type(field) { + JsonArray::from(column) + .try_align(field.data_type()) + .context(DataTypeMismatchSnafu)? + } else { + column.clone() + }; + Ok(column) + }) + .collect::>>()?; + let aligned = arrays.iter().map(|x| x.as_ref()).collect::>(); + interleave(&aligned, &self.indices).context(ComputeArrowSnafu) }) .collect::>>()?; diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 6dfbab4a0b..6d83d930b1 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -51,6 +51,7 @@ use crate::sst::{ /// /// This mapper support duplicate and unsorted projection indices. /// The output schema is determined by the projection indices. +#[derive(Clone)] pub struct FlatProjectionMapper { /// Metadata of the region. metadata: RegionMetadataRef, diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index d667be9cb8..6b4de71db3 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -15,6 +15,7 @@ //! Structs for partition ranges. use common_time::Timestamp; +use datatypes::arrow::datatypes::SchemaRef; use smallvec::{SmallVec, smallvec}; use store_api::region_engine::PartitionRange; use store_api::storage::TimeSeriesDistribution; @@ -478,6 +479,12 @@ impl MemRangeBuilder { pub(crate) fn stats(&self) -> &MemtableStats { &self.stats } + + /// Returns the record batch schema for this memtable range if available. + #[expect(unused)] + pub(crate) fn record_batch_schema(&self) -> Option { + self.range.record_batch_schema() + } } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 7c448b0f63..12bc891e3a 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -106,6 +106,12 @@ impl FlatWriteFormat { let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()])); columns[sequence_column_index(batch.num_columns())] = sequence_array; + // let columns = common_recordbatch::recordbatch::maybe_align_json_array_with_schema( + // &self.arrow_schema, + // columns, + // ) + // .context(RecordBatchSnafu)?; + // RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu) RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu) } } diff --git a/tests-integration/tests/jsonbench.rs b/tests-integration/tests/jsonbench.rs index 55cfcd53f0..02692e7e82 100644 --- a/tests-integration/tests/jsonbench.rs +++ b/tests-integration/tests/jsonbench.rs @@ -153,16 +153,10 @@ async fn query_data(frontend: &Arc) -> io::Result<()> { +----------+"#; execute_sql_and_expect(frontend, sql, expected).await; - let sql = "SELECT * FROM bluesky ORDER BY time_us"; - let expected = fs::read_to_string(find_workspace_path( - "tests-integration/resources/jsonbench-select-all.txt", - ))?; - execute_sql_and_expect(frontend, sql, &expected).await; - // query 1: let sql = " SELECT - json_get_string(data, '$.commit.collection') AS event, count() AS count + data.commit.collection AS event, count() AS count FROM bluesky GROUP BY event ORDER BY count DESC, event ASC"; @@ -180,13 +174,12 @@ ORDER BY count DESC, event ASC"; // query 2: let sql = " SELECT - json_get_string(data, '$.commit.collection') AS event, + data.commit.collection AS event, count() AS count, - count(DISTINCT json_get_string(data, '$.did')) AS users + count(DISTINCT data.did) AS users FROM bluesky WHERE - (json_get_string(data, '$.kind') = 'commit') AND - (json_get_string(data, '$.commit.operation') = 'create') + data.kind = 'commit' AND data.commit.operation = 'create' GROUP BY event ORDER BY count DESC, event ASC"; let expected = r#" @@ -203,15 +196,14 @@ ORDER BY count DESC, event ASC"; // query 3: let sql = " SELECT - json_get_string(data, '$.commit.collection') AS event, - date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day, + data.commit.collection AS event, + date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day, count() AS count FROM bluesky WHERE - (json_get_string(data, '$.kind') = 'commit') AND - (json_get_string(data, '$.commit.operation') = 'create') AND - json_get_string(data, '$.commit.collection') IN - ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') + data.kind = 'commit' AND + data.commit.operation = 'create' AND + data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') GROUP BY event, hour_of_day ORDER BY hour_of_day, event"; let expected = r#" @@ -227,13 +219,13 @@ ORDER BY hour_of_day, event"; // query 4: let sql = " SELECT - json_get_string(data, '$.did') as user_id, - min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts + data.did::String as user_id, + min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts FROM bluesky WHERE - (json_get_string(data, '$.kind') = 'commit') AND - (json_get_string(data, '$.commit.operation') = 'create') AND - (json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post') + data.kind = 'commit' AND + data.commit.operation = 'create' AND + data.commit.collection = 'app.bsky.feed.post' GROUP BY user_id ORDER BY first_post_ts ASC, user_id DESC LIMIT 3"; @@ -250,17 +242,17 @@ LIMIT 3"; // query 5: let sql = " SELECT - json_get_string(data, '$.did') as user_id, + data.did::String as user_id, date_part( 'epoch', - max(to_timestamp_micros(json_get_int(data, '$.time_us'))) - - min(to_timestamp_micros(json_get_int(data, '$.time_us'))) + max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) - + min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) ) AS activity_span FROM bluesky WHERE - (json_get_string(data, '$.kind') = 'commit') AND - (json_get_string(data, '$.commit.operation') = 'create') AND - (json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post') + data.kind = 'commit' AND + data.commit.operation = 'create' AND + data.commit.collection = 'app.bsky.feed.post' GROUP BY user_id ORDER BY activity_span DESC, user_id DESC LIMIT 3"; @@ -304,30 +296,21 @@ async fn insert_data_by_sql(frontend: &Arc) -> io::Result<()> { async fn desc_table(frontend: &Arc) { let sql = "DESC TABLE bluesky"; let expected = r#" -+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| data | Json<{"_raw":"","commit.collection":"","commit.operation":"","did":"","kind":"","time_us":""}> | | YES | | FIELD | -| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP | -+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#; ++---------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++---------+----------------------+-----+------+---------+---------------+ +| data | JSON2 | | YES | | FIELD | +| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP | ++---------+----------------------+-----+------+---------+---------------+"#; execute_sql_and_expect(frontend, sql, expected).await; } async fn create_table(frontend: &Arc) { let sql = r#" CREATE TABLE bluesky ( - "data" JSON ( - format = "partial", - fields = Struct< - kind String, - "commit.operation" String, - "commit.collection" String, - did String, - time_us Bigint - >, - ), + "data" JSON2, time_us TimestampMicrosecond TIME INDEX, -) +) WITH ('append_mode' = 'true', 'sst_format' = 'flat') "#; execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await; } diff --git a/tests/cases/standalone/common/types/json/json2.result b/tests/cases/standalone/common/types/json/json2.result index fdae802f3b..3b8a9eb21d 100644 --- a/tests/cases/standalone/common/types/json/json2.result +++ b/tests/cases/standalone/common/types/json/json2.result @@ -117,6 +117,23 @@ select j.a.b from json2_table order by ts; | 10 | +-------------------------------------+ +select j.a, j.a.x from json2_table order by ts; + ++--------------------------------------------------+----------------------------------------------------+ +| json_get(json2_table.j,Utf8("a"),Utf8View(NULL)) | json_get(json2_table.j,Utf8("a.x"),Utf8View(NULL)) | ++--------------------------------------------------+----------------------------------------------------+ +| {"b":1,"x":null} | | +| {"b":-2,"x":null} | | +| {"b":3,"x":null} | | +| {"b":-4,"x":null} | | +| {"b":null,"x":null} | | +| | | +| {"b":"s7","x":null} | | +| {"b":8,"x":null} | | +| {"b":null,"x":true} | true | +| {"b":10,"x":null} | | ++--------------------------------------------------+----------------------------------------------------+ + select j.c, j.y from json2_table order by ts; +-----------------------------------+-----------------------------------+ @@ -134,6 +151,44 @@ select j.c, j.y from json2_table order by ts; | | false | +-----------------------------------+-----------------------------------+ +select j from json2_table order by ts; + +Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 0 + +select * from json2_table order by ts; + +Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 1 + +select j.a.b + 1 from json2_table order by ts; + ++------------------------------------------------------------+ +| json_get(json2_table.j,Utf8("a.b"),Int64(NULL)) + Int64(1) | ++------------------------------------------------------------+ +| 2 | +| -1 | +| 4 | +| -3 | +| | +| | +| | +| 9 | +| | +| 11 | ++------------------------------------------------------------+ + +select abs(j.a.b) from json2_table order by ts; + +Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts. + Candidate functions: + abs(Numeric(1)) + +-- "j.c" is of type "String", "abs" is expected to be all "null"s. +select abs(j.c) from json2_table order by ts; + +Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts. + Candidate functions: + abs(Numeric(1)) + select j.d from json2_table order by ts; +-----------------------------------+ diff --git a/tests/cases/standalone/common/types/json/json2.sql b/tests/cases/standalone/common/types/json/json2.sql index 57e113f8be..cb8df2f8b9 100644 --- a/tests/cases/standalone/common/types/json/json2.sql +++ b/tests/cases/standalone/common/types/json/json2.sql @@ -42,8 +42,21 @@ explain select j.a.x::bool from json2_table; select j.a.b from json2_table order by ts; +select j.a, j.a.x from json2_table order by ts; + select j.c, j.y from json2_table order by ts; +select j from json2_table order by ts; + +select * from json2_table order by ts; + +select j.a.b + 1 from json2_table order by ts; + +select abs(j.a.b) from json2_table order by ts; + +-- "j.c" is of type "String", "abs" is expected to be all "null"s. +select abs(j.c) from json2_table order by ts; + select j.d from json2_table order by ts; drop table json2_table;