From 44856f0041487a005ceb8b14506dfbe783f50bf6 Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 3 Mar 2026 17:34:05 +0800 Subject: [PATCH] json2 (?) query-driven and data-driven concretize (?) select (?) compaction Signed-off-by: luofucong --- Cargo.lock | 1 + src/cmd/src/datanode/objbench.rs | 8 +- .../function/src/scalars/json/json_get.rs | 4 +- src/common/recordbatch/src/recordbatch.rs | 2 +- src/datanode/src/lib.rs | 2 + src/datatypes/src/json.rs | 121 +++++++--- src/datatypes/src/json/requirement.rs | 77 ++++++ src/datatypes/src/json/value.rs | 36 ++- src/datatypes/src/types/json_type.rs | 32 ++- src/datatypes/src/vectors/json/array.rs | 35 ++- src/mito2/Cargo.toml | 1 + src/mito2/benches/memtable_bench.rs | 2 + src/mito2/src/access_layer.rs | 4 +- src/mito2/src/cache/write_cache.rs | 7 +- src/mito2/src/compaction.rs | 14 +- src/mito2/src/compaction/compactor.rs | 4 +- src/mito2/src/flush.rs | 17 +- src/mito2/src/memtable.rs | 64 +++++ src/mito2/src/memtable/bulk.rs | 8 + src/mito2/src/memtable/bulk/part.rs | 9 +- src/mito2/src/read.rs | 20 ++ src/mito2/src/read/flat_merge.rs | 31 ++- src/mito2/src/read/flat_projection.rs | 25 +- src/mito2/src/read/range.rs | 6 + src/mito2/src/read/scan_region.rs | 162 ++++++++++++- src/mito2/src/read/seq_scan.rs | 2 +- src/mito2/src/sst.rs | 3 + src/mito2/src/sst/parquet.rs | 8 +- src/mito2/src/sst/parquet/flat_format.rs | 12 +- src/mito2/src/sst/parquet/format.rs | 6 +- src/mito2/src/sst/parquet/reader.rs | 3 +- src/mito2/src/sst/parquet/writer.rs | 37 ++- src/mito2/src/test_util/sst_util.rs | 9 +- src/query/src/datafusion.rs | 1 + .../src/datafusion/json2_expr_planner.rs | 131 ++++++++++ src/query/src/datafusion/planner.rs | 6 +- src/query/src/dummy_catalog.rs | 6 + src/query/src/optimizer.rs | 1 + src/query/src/optimizer/json2_scan_hint.rs | 225 ++++++++++++++++++ src/query/src/query_engine/state.rs | 2 + src/store-api/src/storage/requests.rs | 12 + tests-integration/tests/jsonbench.rs | 73 +++--- .../common/tql-explain-analyze/explain.result | 4 + .../standalone/common/types/json/json2.result | 114 +++++++++ .../standalone/common/types/json/json2.sql | 21 ++ .../common/types/json/jsonbench.result | 180 ++++++++++++++ .../common/types/json/jsonbench.sql | 92 +++++++ 47 files changed, 1470 insertions(+), 170 deletions(-) create mode 100644 src/datatypes/src/json/requirement.rs create mode 100644 src/query/src/datafusion/json2_expr_planner.rs create mode 100644 src/query/src/optimizer/json2_scan_hint.rs create mode 100644 tests/cases/standalone/common/types/json/jsonbench.result create mode 100644 tests/cases/standalone/common/types/json/jsonbench.sql diff --git a/Cargo.lock b/Cargo.lock index 29dc40ff85..2db1b77f75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8168,6 +8168,7 @@ version = "1.0.0" dependencies = [ "api", "aquamarine", + "arrow-schema 57.3.0", "async-channel 1.9.0", "async-stream", "async-trait", diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index d8793bee2f..55bcbd4e65 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -26,7 +26,7 @@ use mito2::access_layer::{ }; use mito2::cache::{CacheManager, CacheManagerRef}; use mito2::config::{FulltextIndexConfig, MitoConfig, Mode}; -use mito2::read::FlatSource; +use mito2::read::{FlatSource, RecordBatchSource}; use mito2::sst::FormatType; use mito2::sst::file::{FileHandle, FileMeta}; use mito2::sst::file_purger::{FilePurger, FilePurgerRef}; @@ -244,10 +244,14 @@ impl ObjbenchCommand { ..Default::default() }; + let source = RecordBatchSource::new( + region_meta.schema.arrow_schema().clone(), + FlatSource::Stream(reader_stream), + ); let write_req = SstWriteRequest { op_type: OperationType::Flush, metadata: region_meta, - source: FlatSource::Stream(reader_stream), + source, cache_manager, storage: None, max_sequence: None, diff --git a/src/common/function/src/scalars/json/json_get.rs b/src/common/function/src/scalars/json/json_get.rs index 00f528d0a9..a12ca5691d 100644 --- a/src/common/function/src/scalars/json/json_get.rs +++ b/src/common/function/src/scalars/json/json_get.rs @@ -439,12 +439,12 @@ fn json_struct_get(array: &ArrayRef, path: &str, with_type: Option<&DataType>) - /// use the third argument's type to determine the return type. #[derive(Debug, Display)] #[display("{}", Self::NAME.to_ascii_uppercase())] -pub(super) struct JsonGetWithType { +pub struct JsonGetWithType { signature: Signature, } impl JsonGetWithType { - pub(crate) const NAME: &'static str = "json_get"; + pub const NAME: &'static str = "json_get"; } impl Default for JsonGetWithType { diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 4289714afd..413d89fb78 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -355,7 +355,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul Ok(RecordBatch::from_df_record_batch(schema, record_batch)) } -fn maybe_align_json_array_with_schema( +pub fn maybe_align_json_array_with_schema( schema: &ArrowSchemaRef, arrays: Vec, ) -> Result> { diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 7e0db3cabc..f1c33a4d3b 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![recursion_limit = "256"] + pub mod alive_keeper; pub mod config; pub mod datanode; diff --git a/src/datatypes/src/json.rs b/src/datatypes/src/json.rs index db657abbcb..0555649832 100644 --- a/src/datatypes/src/json.rs +++ b/src/datatypes/src/json.rs @@ -19,6 +19,7 @@ //! The struct will carry all the fields of the Json object. We will not flatten any json object in this implementation. //! +pub mod requirement; pub mod value; use std::collections::{BTreeMap, HashSet}; @@ -26,12 +27,12 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value as Json}; -use snafu::{OptionExt, ResultExt, ensure}; +use snafu::{OptionExt, ResultExt}; use crate::error::{self, InvalidJsonSnafu, Result, SerializeSnafu}; use crate::json::value::{JsonValue, JsonVariant}; use crate::types::json_type::{JsonNativeType, JsonNumberType, JsonObjectType}; -use crate::types::{StructField, StructType}; +use crate::types::{JsonType, StructField, StructType}; use crate::value::{ListValue, StructValue, Value}; /// The configuration of JSON encoding @@ -305,32 +306,45 @@ fn encode_json_array_with_context<'a>( ) -> Result { let json_array_len = json_array.len(); let mut items = Vec::with_capacity(json_array_len); - let mut element_type = item_type.cloned(); for (index, value) in json_array.into_iter().enumerate() { let array_context = context.with_key(&index.to_string()); - let item_value = - encode_json_value_with_context(value, element_type.as_ref(), &array_context)?; - let item_type = item_value.json_type().native_type().clone(); - items.push(item_value.into_variant()); + let item_value = encode_json_value_with_context(value, None, &array_context)?; + items.push(item_value); + } - // Determine the common type for the list - if let Some(current_type) = &element_type { - // It's valid for json array to have different types of items, for example, - // ["a string", 1]. However, the `JsonValue` will be converted to Arrow list array, - // which requires all items have exactly same type. So we forbid the different types - // case here. Besides, it's not common for items in a json array to differ. So I think - // we are good here. - ensure!( - item_type == *current_type, - error::InvalidJsonSnafu { - value: "all items in json array must have the same type" - } - ); - } else { - element_type = Some(item_type); + // In specification, it's valid for a JSON array to have different types of items, for example, + // ["a string", 1]. However, in implementation, the `JsonValue` will be converted to Arrow list + // array, which requires all items have exactly the same type. So we merge out the maybe + // different item types to a unified type, and align all the item values to it. + + let provided_item_type = item_type.map(|x| JsonType::new_json2(x.clone())); + let merged_item_type = if let Some((first, rests)) = items.split_first() { + let mut merged = first.json_type().clone(); + for rest in rests.iter().map(|x| x.json_type()) { + merged.merge(rest)?; + } + Some(merged) + } else { + None + }; + let unified_item_type = match (provided_item_type, merged_item_type) { + (Some(mut x), Some(y)) => { + x.merge(&y)?; + Some(x) + } + (Some(x), None) | (None, Some(x)) => Some(x), + (None, None) => None, + }; + if let Some(unified_item_type) = unified_item_type { + for item in &mut items { + item.try_align(&unified_item_type)?; } } + let items = items + .into_iter() + .map(|x| x.into_variant()) + .collect::>(); Ok(JsonValue::new(JsonVariant::Array(items))) } @@ -729,7 +743,7 @@ where #[cfg(test)] mod tests { - + use common_base::bytes::Bytes; use serde_json::json; use super::*; @@ -1050,11 +1064,21 @@ mod tests { fn test_encode_json_array_mixed_types() { let json = json!([1, "hello", true, 3.15]); let settings = JsonStructureSettings::Structured(None); - let result = settings.encode_with_type(json, None); - assert_eq!( - result.unwrap_err().to_string(), - "Invalid JSON: all items in json array must have the same type" - ); + let result = settings + .encode_with_type(json, None) + .unwrap() + .into_json_inner() + .unwrap(); + + if let Value::List(list_value) = result { + assert_eq!(list_value.items().len(), 4); + assert_eq!( + list_value.datatype(), + Arc::new(ConcreteDataType::binary_datatype()) + ); + } else { + panic!("Expected List value"); + } } #[test] @@ -1276,12 +1300,12 @@ mod tests { #[test] fn test_encode_json_array_with_item_type() { let json = json!([1, 2, 3]); - let item_type = Arc::new(ConcreteDataType::uint64_datatype()); + let item_type = Arc::new(ConcreteDataType::int64_datatype()); let settings = JsonStructureSettings::Structured(None); let result = settings .encode_with_type( json, - Some(&JsonNativeType::Array(Box::new(JsonNativeType::u64()))), + Some(&JsonNativeType::Array(Box::new(JsonNativeType::i64()))), ) .unwrap() .into_json_inner() @@ -1289,9 +1313,9 @@ mod tests { if let Value::List(list_value) = result { assert_eq!(list_value.items().len(), 3); - assert_eq!(list_value.items()[0], Value::UInt64(1)); - assert_eq!(list_value.items()[1], Value::UInt64(2)); - assert_eq!(list_value.items()[2], Value::UInt64(3)); + assert_eq!(list_value.items()[0], Value::Int64(1)); + assert_eq!(list_value.items()[1], Value::Int64(2)); + assert_eq!(list_value.items()[2], Value::Int64(3)); assert_eq!(list_value.datatype(), item_type); } else { panic!("Expected List value"); @@ -2249,11 +2273,32 @@ mod tests { )])), ); - let decoded_struct = settings.decode_struct(array_struct); - assert_eq!( - decoded_struct.unwrap_err().to_string(), - "Invalid JSON: all items in json array must have the same type" - ); + let decoded_struct = settings.decode_struct(array_struct).unwrap(); + let fields = decoded_struct.struct_type().fields(); + let decoded_fields: Vec<&str> = fields.iter().map(|f| f.name()).collect(); + assert!(decoded_fields.contains(&"value")); + + if let Value::List(list_value) = &decoded_struct.items()[0] { + assert_eq!(list_value.items().len(), 4); + assert_eq!( + list_value.items()[0], + Value::Binary(Bytes::from("1".as_bytes())) + ); + assert_eq!( + list_value.items()[1], + Value::Binary(Bytes::from(r#""hello""#.as_bytes())) + ); + assert_eq!( + list_value.items()[2], + Value::Binary(Bytes::from("true".as_bytes())) + ); + assert_eq!( + list_value.items()[3], + Value::Binary(Bytes::from("3.15".as_bytes())) + ); + } else { + panic!("Expected array to be decoded as ListValue"); + } } #[test] diff --git a/src/datatypes/src/json/requirement.rs b/src/datatypes/src/json/requirement.rs new file mode 100644 index 0000000000..5572aa0b66 --- /dev/null +++ b/src/datatypes/src/json/requirement.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use crate::data_type::ConcreteDataType; +use crate::types::{StructField, StructType}; + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct JsonPathTarget { + root: JsonPathTargetNode, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +struct JsonPathTargetNode { + children: BTreeMap, + leaf_type: Option, +} + +impl JsonPathTarget { + pub fn require_typed_path(&mut self, path: &str, data_type: ConcreteDataType) { + let mut current = &mut self.root; + for segment in path.split('.') { + current = current.children.entry(segment.to_string()).or_default(); + } + current.require_leaf_type(data_type); + } + + pub fn is_empty(&self) -> bool { + self.root.children.is_empty() + } + + pub fn build_type(&self) -> Option { + if self.is_empty() { + None + } else { + Some(ConcreteDataType::Struct(self.root.build_struct_type())) + } + } +} + +impl JsonPathTargetNode { + fn require_leaf_type(&mut self, data_type: ConcreteDataType) { + self.leaf_type = Some(data_type); + } + + fn build_data_type(&self) -> ConcreteDataType { + if self.children.is_empty() { + self.leaf_type + .clone() + .unwrap_or_else(ConcreteDataType::string_datatype) + } else { + ConcreteDataType::Struct(self.build_struct_type()) + } + } + + fn build_struct_type(&self) -> StructType { + let fields = self + .children + .iter() + .map(|(name, child)| StructField::new(name.clone(), child.build_data_type(), true)) + .collect::>(); + StructType::new(Arc::new(fields)) + } +} diff --git a/src/datatypes/src/json/value.rs b/src/datatypes/src/json/value.rs index e2422ca78d..97a8a7fb77 100644 --- a/src/datatypes/src/json/value.rs +++ b/src/datatypes/src/json/value.rs @@ -161,12 +161,18 @@ impl JsonVariant { }; JsonNativeType::Array(Box::new(item_type)) } - JsonVariant::Object(object) => JsonNativeType::Object( - object - .iter() - .map(|(k, v)| (k.clone(), v.native_type())) - .collect(), - ), + JsonVariant::Object(object) => { + if object.is_empty() { + JsonNativeType::Null + } else { + JsonNativeType::Object( + object + .iter() + .map(|(k, v)| (k.clone(), v.native_type())) + .collect(), + ) + } + } JsonVariant::Variant(_) => JsonNativeType::Variant, } } @@ -639,12 +645,18 @@ impl JsonVariantRef<'_> { }; JsonNativeType::Array(Box::new(item_type)) } - JsonVariantRef::Object(object) => JsonNativeType::Object( - object - .iter() - .map(|(k, v)| (k.to_string(), native_type(v))) - .collect(), - ), + JsonVariantRef::Object(object) => { + if object.is_empty() { + JsonNativeType::Null + } else { + JsonNativeType::Object( + object + .iter() + .map(|(k, v)| (k.to_string(), native_type(v))) + .collect(), + ) + } + } JsonVariantRef::Variant(_) => JsonNativeType::Variant, } } diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index c2d9961db8..4ccf393446 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::collections::BTreeMap; use std::fmt::{Debug, Display, Formatter}; use std::str::FromStr; @@ -313,10 +314,36 @@ fn merge(this: &JsonNativeType, that: &JsonNativeType) -> JsonNativeType { JsonNativeType::Object(merge_object(this, that)) } (JsonNativeType::Null, x) | (x, JsonNativeType::Null) => x.clone(), + + (JsonNativeType::Number(x), JsonNativeType::Number(y)) => { + JsonNativeType::Number(match (x, y) { + (x, y) if x == y => *x, + (JsonNumberType::F64, _) | (_, JsonNumberType::F64) => JsonNumberType::F64, + _ => JsonNumberType::I64, + }) + } + _ => JsonNativeType::Variant, } } +pub fn merge_as_json_type<'a>( + left: &'a ArrowDataType, + right: &ArrowDataType, +) -> Cow<'a, ArrowDataType> { + if left == right { + return Cow::Borrowed(left); + } + + let mut left = JsonType::from(left); + let right = JsonType::from(right); + Cow::Owned(if left.merge(&right).is_ok() { + left.as_arrow_type() + } else { + ArrowDataType::Utf8 + }) +} + impl From<&ArrowDataType> for JsonType { fn from(t: &ArrowDataType) -> Self { JsonType::new_json2(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t))) @@ -748,18 +775,15 @@ mod tests { Ok(r#""""#), )?; - // Identical number categories should stay as Number. test( "1", &mut JsonType::new_json2(JsonNativeType::i64()), Ok(r#""""#), )?; - - // Conflicting number categories should be lifted to Variant. test( "1.5", &mut JsonType::new_json2(JsonNativeType::i64()), - Ok(r#""""#), + Ok(r#""""#), )?; // Object merge should preserve existing fields and append missing fields. diff --git a/src/datatypes/src/vectors/json/array.rs b/src/datatypes/src/vectors/json/array.rs index 43e90f258b..7de52accba 100644 --- a/src/datatypes/src/vectors/json/array.rs +++ b/src/datatypes/src/vectors/json/array.rs @@ -24,9 +24,10 @@ use arrow_schema::{DataType, FieldRef}; use serde_json::Value; use snafu::{OptionExt, ResultExt, ensure}; -use crate::arrow_array::{StringArray, binary_array_value, string_array_value}; +use crate::arrow_array::{MutableBinaryArray, StringArray, binary_array_value, string_array_value}; use crate::error::{ AlignJsonArraySnafu, ArrowComputeSnafu, DeserializeSnafu, InvalidJsonSnafu, Result, + SerializeSnafu, }; pub struct JsonArray<'a> { @@ -195,7 +196,39 @@ impl JsonArray<'_> { Ok(Arc::new(json_array)) } + fn try_decode_variant(&self) -> Result { + let json_values = (0..self.inner.len()) + .map(|i| self.try_get_value(i)) + .collect::>>()?; + + let serialized_values = json_values + .iter() + .map(|value| { + (!value.is_null()) + .then(|| serde_json::to_vec(value)) + .transpose() + }) + .collect::, _>>() + .context(SerializeSnafu)?; + let total_bytes = serialized_values.iter().flatten().map(Vec::len).sum(); + + let mut builder = MutableBinaryArray::with_capacity(self.inner.len(), total_bytes); + for serialized_value in serialized_values { + if let Some(bytes) = serialized_value { + builder.append_value(bytes); + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.finish())) + } + fn try_cast(&self, to_type: &DataType) -> Result { + if matches!(to_type, DataType::Binary) { + return self.try_decode_variant(); + } + if compute::can_cast_types(self.inner.data_type(), to_type) { return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu); } diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index c5192f1360..964f6eedea 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -16,6 +16,7 @@ workspace = true [dependencies] api.workspace = true aquamarine.workspace = true +arrow-schema.workspace = true async-channel = "1.9" common-stat.workspace = true async-stream.workspace = true diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 8336625e3c..e6881a766a 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -229,6 +229,7 @@ fn bulk_part_converter(c: &mut Criterion) { &FlatSchemaOptions { raw_pk_columns: false, string_pk_use_dict: false, + ..Default::default() }, ); let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false); @@ -255,6 +256,7 @@ fn bulk_part_converter(c: &mut Criterion) { &FlatSchemaOptions { raw_pk_columns: true, string_pk_use_dict: true, + ..Default::default() }, ); let mut converter = diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index b6adb6eb59..6792c633bd 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -36,7 +36,7 @@ use crate::error::{ CleanDirSnafu, DeleteIndexSnafu, DeleteIndexesSnafu, DeleteSstsSnafu, OpenDalSnafu, Result, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED}; -use crate::read::FlatSource; +use crate::read::RecordBatchSource; use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId}; use crate::sst::index::IndexerBuilderImpl; @@ -525,7 +525,7 @@ pub enum OperationType { pub struct SstWriteRequest { pub op_type: OperationType, pub metadata: RegionMetadataRef, - pub source: FlatSource, + pub source: RecordBatchSource, pub cache_manager: CacheManagerRef, #[allow(dead_code)] pub storage: Option, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index e2483ed4e4..5565f30929 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -513,7 +513,7 @@ mod tests { use crate::cache::test_util::{assert_parquet_metadata_equal, new_fs_store}; use crate::cache::{CacheManager, CacheStrategy}; use crate::error::InvalidBatchSnafu; - use crate::read::FlatSource; + use crate::read::{FlatSource, RecordBatchSource}; use crate::region::options::IndexOptions; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::test_util::TestEnv; @@ -728,9 +728,11 @@ mod tests { let metadata = Arc::new(sst_region_metadata()); // Creates a source that can return an error to abort the writer. + let record_batch = new_record_batch_by_range(&["a", "d"], 0, 60); + let schema = record_batch.schema(); let source = FlatSource::Iter(Box::new( [ - Ok(new_record_batch_by_range(&["a", "d"], 0, 60)), + Ok(record_batch), InvalidBatchSnafu { reason: "Abort the writer", } @@ -738,6 +740,7 @@ mod tests { ] .into_iter(), )); + let source = RecordBatchSource::new(schema, source); // Write to local cache and upload sst to mock remote store let write_request = SstWriteRequest { diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 296d9ce2b1..f2586058fe 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -59,10 +59,10 @@ use crate::error::{ RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; -use crate::read::BoxedRecordBatchStream; use crate::read::flat_projection::FlatProjectionMapper; -use crate::read::scan_region::{PredicateGroup, ScanInput}; +use crate::read::scan_region::{PredicateGroup, ScanInput, concretize_json2_types}; use crate::read::seq_scan::SeqScan; +use crate::read::{FlatSource, RecordBatchSource}; use crate::region::options::{MergeMode, RegionOptions}; use crate::region::version::VersionControlRef; use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState}; @@ -993,12 +993,14 @@ struct CompactionSstReaderBuilder<'a> { impl CompactionSstReaderBuilder<'_> { /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction. - async fn build_flat_sst_reader(self) -> Result { + async fn build_flat_sst_reader(self) -> Result { let scan_input = self.build_scan_input()?.with_compaction(true); - - SeqScan::new(scan_input) + let scan_input = concretize_json2_types(scan_input).await?; + let schema = scan_input.mapper.output_schema().arrow_schema().clone(); + let reader = SeqScan::new(scan_input) .build_flat_reader_for_compaction() - .await + .await?; + Ok(RecordBatchSource::new(schema, FlatSource::Stream(reader))) } fn build_scan_input(self) -> Result { diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 59a8a10077..e5dae0af0c 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -43,7 +43,6 @@ use crate::error::{ }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; -use crate::read::FlatSource; use crate::region::options::RegionOptions; use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; @@ -362,8 +361,7 @@ impl SstMerger for DefaultSstMerger { time_range: output.output_time_range, merge_mode, }; - let reader = builder.build_flat_sst_reader().await?; - let source = FlatSource::Stream(reader); + let source = builder.build_flat_sst_reader().await?; let mut metrics = Metrics::new(WriteType::Compaction); let region_metadata = compaction_region.region_metadata.clone(); let sst_infos = compaction_region diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 3af94c08e6..747183c516 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -46,9 +46,9 @@ use crate::metrics::{ FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL, INFLIGHT_FLUSH_COUNT, }; -use crate::read::FlatSource; use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeIterator; +use crate::read::{FlatSource, RecordBatchSource}; use crate::region::options::{IndexOptions, MergeMode, RegionOptions}; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr}; @@ -550,14 +550,16 @@ impl RegionFlushTask { write_opts: &WriteOptions, mem_ranges: MemtableRanges, ) -> Result { - let batch_schema = to_flat_sst_arrow_schema( - &version.metadata, - &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding), - ); + let memtable_schema = mem_ranges + .schema() + .unwrap_or_else(|| version.metadata.schema.arrow_schema().clone()); + + let options = FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding); + let batch_schema = to_flat_sst_arrow_schema(&version.metadata, &options); let field_column_start = flat_format::field_column_start(&version.metadata, batch_schema.fields().len()); let flat_sources = memtable_flat_sources( - batch_schema, + batch_schema.clone(), mem_ranges, &version.options, field_column_start, @@ -565,6 +567,7 @@ impl RegionFlushTask { let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len()); let num_encoded = flat_sources.encoded.len(); for (source, max_sequence) in flat_sources.sources { + let source = RecordBatchSource::new(memtable_schema.clone(), source); let write_request = self.new_write_request(version, max_sequence, source); let access_layer = self.access_layer.clone(); let write_opts = write_opts.clone(); @@ -642,7 +645,7 @@ impl RegionFlushTask { &self, version: &VersionRef, max_sequence: u64, - source: FlatSource, + source: RecordBatchSource, ) -> SstWriteRequest { let flat_format = version .options diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index e1494aa47b..3218b3c2ca 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -14,6 +14,7 @@ //! Memtables are write buffers for regions. +use std::borrow::Cow; use std::collections::BTreeMap; use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -62,6 +63,10 @@ pub use bulk::part::{ BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size, sort_primary_key_record_batch, }; +use datatypes::arrow::datatypes::{Schema, SchemaRef}; +use datatypes::extension::json; +use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::types::json_type; #[cfg(any(test, feature = "test"))] pub use time_partition::filter_record_batch; @@ -228,6 +233,55 @@ impl MemtableRanges { .max() .unwrap_or(0) } + + pub(crate) fn schema(&self) -> Option { + let mut schemas = self + .ranges + .values() + .filter_map(|x| x.record_batch_schema()) + .collect::>(); + + if schemas.iter().all(|x| !x.has_json_extension_field()) { + // If there are no JSON extension fields in any schemas, the invariant must be hold, + // that all schemas are same (they are all derived from same region metadata). + // So it's ok to return the first one as the schema of the whole memtable ranges. + return (!schemas.is_empty()).then(|| schemas.swap_remove(0)); + } + + // If there are JSON extension fields, by convention, only their concrete data types + // (Arrow's Struct) may differ. Other things like the metadata or the fields count are same. + // So to produce the final schema, we can solely merge the data types. + schemas + .split_first() + .map(|(first, rest)| merge_json_extension_fields(first, rest)) + } +} + +pub(crate) fn merge_json_extension_fields(base: &SchemaRef, others: &[SchemaRef]) -> SchemaRef { + let mut fields = base.fields().iter().cloned().collect::>(); + for (i, field) in fields.iter_mut().enumerate() { + if !json::is_json_extension_type(field) { + continue; + } + + let merged = others + .iter() + .map(|x| Cow::Borrowed(x.field(i).data_type())) + .reduce(|acc, e| { + Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned()) + }); + if let Some(merged) = merged + && field.data_type() != merged.as_ref() + { + let merged = + json_type::merge_as_json_type(field.data_type(), merged.as_ref()).into_owned(); + + let mut new = field.as_ref().clone(); + new.set_data_type(merged); + *field = Arc::new(new); + } + } + Arc::new(Schema::new_with_metadata(fields, base.metadata().clone())) } impl IterBuilder for MemtableRanges { @@ -558,6 +612,11 @@ pub trait IterBuilder: Send + Sync { .fail() } + /// Returns the schema of record batches produced by this iterator. + fn record_batch_schema(&self) -> Option { + None + } + /// Returns the [EncodedRange] if the range is already encoded into SST. fn encoded_range(&self) -> Option { None @@ -735,6 +794,11 @@ impl MemtableRange { self.context.builder.is_record_batch() } + /// Returns the schema of record batches if this range supports record batch iteration. + pub fn record_batch_schema(&self) -> Option { + self.context.builder.record_batch_schema() + } + pub fn num_rows(&self) -> usize { self.stats.num_rows } diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 50330dd3ac..24ab45793b 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -816,6 +816,10 @@ impl IterBuilder for BulkRangeIterBuilder { fn encoded_range(&self) -> Option { None } + + fn record_batch_schema(&self) -> Option { + Some(self.part.batch.schema()) + } } impl IterBuilder for MultiBulkRangeIterBuilder { @@ -848,6 +852,10 @@ impl IterBuilder for MultiBulkRangeIterBuilder { fn encoded_range(&self) -> Option { None } + + fn record_batch_schema(&self) -> Option { + self.part.record_batch_schema() + } } /// Iterator builder for encoded bulk range diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 2b817dcb3a..6049b66723 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -433,7 +433,7 @@ impl UnorderedPart { return Ok(Some(self.parts[0].batch.clone())); } - // Get the schema from the first part + // Get the schema from the first part and normalize JSON2 columns across all parts. let schema = self.parts[0].batch.schema(); let concatenated = if schema.has_json_extension_field() { let (schema, batches) = align_parts(&self.parts)?; @@ -1608,6 +1608,11 @@ impl MultiBulkPart { self.series_count } + /// Returns the schema of batches in this part. + pub(crate) fn record_batch_schema(&self) -> Option { + self.batches.first().map(|batch| batch.schema()) + } + /// Returns the number of record batches in this part. pub fn num_batches(&self) -> usize { self.batches.len() @@ -2115,6 +2120,7 @@ mod tests { &FlatSchemaOptions { raw_pk_columns: false, string_pk_use_dict: true, + ..Default::default() }, ); @@ -2552,6 +2558,7 @@ mod tests { &FlatSchemaOptions { raw_pk_columns: false, string_pk_use_dict: true, + ..Default::default() }, ); diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index aaeaa9e62e..c8a55acca7 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -42,6 +42,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::OpType; +use arrow_schema::SchemaRef; use async_trait::async_trait; use common_time::Timestamp; use datafusion_common::arrow::array::UInt8Array; @@ -1122,6 +1123,25 @@ impl FlatSource { } } +pub struct RecordBatchSource { + schema: SchemaRef, + inner: FlatSource, +} + +impl RecordBatchSource { + pub fn new(schema: SchemaRef, inner: FlatSource) -> Self { + Self { schema, inner } + } + + pub(crate) fn schema(&self) -> &SchemaRef { + &self.schema + } + + pub(crate) async fn next_batch(&mut self) -> Result> { + self.inner.next_batch().await + } +} + /// Async batch reader. /// /// The reader must guarantee [Batch]es returned by it have the same schema. diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs index b1c304f244..d23ad68960 100644 --- a/src/mito2/src/read/flat_merge.rs +++ b/src/mito2/src/read/flat_merge.rs @@ -26,12 +26,14 @@ use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaR use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::arrow_array::BinaryArray; +use datatypes::extension::json::is_json_extension_type; use datatypes::timestamp::timestamp_array_to_primitive; +use datatypes::vectors::json::array::JsonArray; use futures::{Stream, TryStreamExt}; use snafu::ResultExt; use store_api::storage::SequenceNumber; -use crate::error::{ComputeArrowSnafu, Result}; +use crate::error::{ComputeArrowSnafu, DataTypeMismatchSnafu, Result}; use crate::memtable::BoxedRecordBatchIterator; use crate::metrics::READ_STAGE_ELAPSED; use crate::read::BoxedRecordBatchStream; @@ -258,14 +260,29 @@ impl BatchBuilder { check_interleave_overflow(&self.batches, &self.schema, &self.indices)?; - let columns = (0..self.schema.fields.len()) - .map(|column_idx| { - let arrays: Vec<_> = self + let columns = self + .schema + .fields() + .iter() + .enumerate() + .map(|(column_idx, field)| { + let arrays = self .batches .iter() - .map(|(_, batch)| batch.column(column_idx).as_ref()) - .collect(); - interleave(&arrays, &self.indices).context(ComputeArrowSnafu) + .map(|(_, batch)| { + let column = batch.column(column_idx); + let column = if is_json_extension_type(field) { + JsonArray::from(column) + .try_align(field.data_type()) + .context(DataTypeMismatchSnafu)? + } else { + column.clone() + }; + Ok(column) + }) + .collect::>>()?; + let aligned = arrays.iter().map(|x| x.as_ref()).collect::>(); + interleave(&aligned, &self.indices).context(ComputeArrowSnafu) }) .collect::>>()?; diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index d73bb5a205..0e2cc3a9ea 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -17,15 +17,20 @@ use std::sync::Arc; use api::v1::SemanticType; +use arrow_schema::extension::ExtensionType; use common_error::ext::BoxedError; -use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu}; +use common_recordbatch::error::{ + ArrowComputeSnafu, DataTypesSnafu, ExternalSnafu, NewDfRecordBatchSnafu, +}; use common_recordbatch::{DfRecordBatch, RecordBatch}; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field}; +use datatypes::extension::json::JsonExtensionType; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::Value; use datatypes::vectors::Helper; +use datatypes::vectors::json::array::JsonArray; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; @@ -43,6 +48,7 @@ use crate::sst::{ /// /// This mapper support duplicate and unsorted projection indices. /// The output schema is determined by the projection indices. +#[derive(Clone)] pub struct FlatProjectionMapper { /// Metadata of the region. metadata: RegionMetadataRef, @@ -226,10 +232,8 @@ impl FlatProjectionMapper { self.input_arrow_schema.clone() } else { // For compaction, we need to build a different schema from encoding. - to_flat_sst_arrow_schema( - &self.metadata, - &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), - ) + let options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding); + to_flat_sst_arrow_schema(&self.metadata, &options) } } @@ -240,6 +244,10 @@ impl FlatProjectionMapper { self.output_schema.clone() } + pub(crate) fn with_output_schema(&mut self, output_schema: SchemaRef) { + self.output_schema = output_schema; + } + /// Converts a flat format [RecordBatch] to a normal [RecordBatch]. /// /// The batch must match the `projection` using to build the mapper. @@ -285,6 +293,13 @@ impl FlatProjectionMapper { array = casted; } } + + let field = self.output_schema.arrow_schema().field(output_idx); + if field.extension_type_name() == Some(JsonExtensionType::NAME) { + array = JsonArray::from(&array) + .try_align(field.data_type()) + .context(DataTypesSnafu)?; + } arrays.push(array); } diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index d667be9cb8..e420a7d00c 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -15,6 +15,7 @@ //! Structs for partition ranges. use common_time::Timestamp; +use datatypes::arrow::datatypes::SchemaRef; use smallvec::{SmallVec, smallvec}; use store_api::region_engine::PartitionRange; use store_api::storage::TimeSeriesDistribution; @@ -478,6 +479,11 @@ impl MemRangeBuilder { pub(crate) fn stats(&self) -> &MemtableStats { &self.stats } + + /// Returns the record batch schema for this memtable range if available. + pub(crate) fn record_batch_schema(&self) -> Option { + self.range.record_batch_schema() + } } #[cfg(test)] diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 374a42144e..c895e6581d 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,7 +14,7 @@ //! Scans a region according to the scan request. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::num::NonZeroU64; use std::sync::Arc; @@ -27,11 +27,19 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::tracing::Instrument; use common_telemetry::{debug, error, tracing, warn}; use common_time::range::TimestampRange; +use datafusion::parquet::arrow::parquet_to_arrow_schema; use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr; use datafusion_common::Column; use datafusion_expr::Expr; use datafusion_expr::utils::expr_to_columns; +use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::data_type::{ConcreteDataType, DataType}; +use datatypes::extension::json::is_json_extension_type; +use datatypes::schema::Schema; +use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::types::json_type; use futures::StreamExt; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData}; use partition::expr::PartitionExpr; use smallvec::SmallVec; use snafu::{OptionExt as _, ResultExt}; @@ -46,9 +54,9 @@ use tokio::sync::{Semaphore, mpsc}; use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; -use crate::cache::CacheStrategy; +use crate::cache::{CacheStrategy, CachedSstMeta}; use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES; -use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result}; +use crate::error::{InvalidMetaSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, Result}; #[cfg(feature = "enterprise")] use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider}; use crate::memtable::{MemtableRange, RangesOptions}; @@ -75,7 +83,8 @@ use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBui #[cfg(feature = "vector_index")] use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef}; use crate::sst::parquet::file_range::PreFilterMode; -use crate::sst::parquet::reader::ReaderMetrics; +use crate::sst::parquet::metadata::MetadataLoader; +use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderMetrics}; #[cfg(feature = "vector_index")] const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2; @@ -530,6 +539,7 @@ impl ScanRegion { .with_merge_mode(self.version.options.merge_mode()) .with_series_row_selector(self.request.series_row_selector) .with_distribution(self.request.distribution) + .with_json2_column_types(self.request.json2_column_types.clone()) .with_explain_flat_format( self.version.options.sst_format == Some(crate::sst::FormatType::Flat), ) @@ -554,6 +564,8 @@ impl ScanRegion { } else { input }; + + let input = concretize_json2_types(input).await?; Ok(input) } @@ -780,6 +792,107 @@ impl ScanRegion { } } +pub(crate) async fn concretize_json2_types(input: ScanInput) -> Result { + let output_schema = input.mapper.output_schema(); + let output_arrow_schema = output_schema.arrow_schema(); + if !output_arrow_schema.has_json_extension_field() { + return Ok(input); + } + + let memtable_schemas = input + .memtables + .iter() + .filter_map(|mem| mem.record_batch_schema()) + .collect::>(); + let parquet_schemas = input.collect_parquet_record_batch_schemas().await?; + if memtable_schemas.is_empty() + && parquet_schemas.is_empty() + // TODO(LFC): If we can concrete json2 type solely by query-driven hint, we can skip data-driven concretize. + && input.json2_column_types.is_empty() + { + return Ok(input); + } + + let mut column_schemas = output_schema.column_schemas().to_vec(); + let mut changed = false; + for (idx, column_schema) in column_schemas.iter_mut().enumerate() { + let output_field = &output_arrow_schema.fields()[idx]; + if !is_json_extension_type(output_field) { + continue; + } + + let mut merged = input + .json2_column_types + .get(&column_schema.name) + .map(ConcreteDataType::as_arrow_type); + for schema in &memtable_schemas { + if let Some((_, field)) = schema.column_with_name(&column_schema.name) { + merge_json_type_candidate(&mut merged, field.data_type()); + } + } + for schema in parquet_schemas.iter() { + if let Some((_, field)) = schema.as_ref().column_with_name(&column_schema.name) { + merge_json_type_candidate(&mut merged, field.data_type()); + } + } + + if let Some(merged) = merged + && merged != *output_field.data_type() + { + column_schema.data_type = ConcreteDataType::from_arrow_type(&merged); + common_telemetry::info!("merged type: {}", column_schema.data_type); + changed = true; + } + } + + if changed { + let mut mapper = Arc::unwrap_or_clone(input.mapper); + mapper.with_output_schema(Arc::new(Schema::new(column_schemas))); + Ok(ScanInput { + mapper: Arc::new(mapper), + ..input + }) + } else { + Ok(input) + } +} + +fn merge_json_type_candidate(merged: &mut Option, candidate: &ArrowDataType) { + match merged { + Some(current) => { + *current = json_type::merge_as_json_type(current, candidate).into_owned(); + } + None => { + *merged = Some(candidate.clone()); + } + } +} + +async fn read_or_load_parquet_metadata( + file: &FileHandle, + access_layer: &AccessLayerRef, + cache_strategy: &CacheStrategy, +) -> Result> { + let mut metrics = MetadataCacheMetrics::default(); + if let Some(metadata) = cache_strategy + .get_sst_meta_data(file.file_id(), &mut metrics, PageIndexPolicy::default()) + .await + { + return Ok(metadata.parquet_metadata()); + } + + let file_path = file.file_path(access_layer.table_dir(), access_layer.path_type()); + let file_size = file.meta_ref().file_size; + let metadata = MetadataLoader::new(access_layer.object_store().clone(), &file_path, file_size) + .load(&mut metrics) + .await + .and_then(|x| CachedSstMeta::try_new(&file_path, x)) + .map(Arc::new)?; + cache_strategy.put_sst_meta_data(file.file_id(), metadata.clone()); + + Ok(metadata.parquet_metadata()) +} + /// Returns true if the time range of a SST `file` matches the `predicate`. fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { if predicate == &TimestampRange::min_to_max() { @@ -839,6 +952,8 @@ pub struct ScanInput { pub(crate) series_row_selector: Option, /// Hint for the required distribution of the scanner. pub(crate) distribution: Option, + /// Query-driven target types for JSON2 columns. + json2_column_types: HashMap, /// Whether the region's configured SST format is flat. explain_flat_format: bool, /// Snapshot upper bound bound at scan open and propagated back to the caller. @@ -878,6 +993,7 @@ impl ScanInput { merge_mode: MergeMode::default(), series_row_selector: None, distribution: None, + json2_column_types: HashMap::new(), explain_flat_format: false, snapshot_sequence: None, compaction: false, @@ -915,6 +1031,15 @@ impl ScanInput { self } + #[must_use] + fn with_json2_column_types( + mut self, + json2_column_types: HashMap, + ) -> Self { + self.json2_column_types = json2_column_types; + self + } + /// Sets cache for this query. #[must_use] pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self { @@ -1288,6 +1413,35 @@ impl ScanInput { pre_filter_mode(self.append_mode, self.merge_mode) } + + pub(crate) async fn collect_parquet_record_batch_schemas( + &self, + ) -> Result> { + let mut schemas = Vec::with_capacity(self.files.len()); + for file in &self.files { + let parquet_metadata = + read_or_load_parquet_metadata(file, &self.access_layer, &self.cache_strategy) + .await?; + let file_metadata = parquet_metadata.file_metadata(); + let arrow_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .map_err(|e| { + InvalidMetaSnafu { + reason: format!( + "Failed to convert parquet metadata to arrow schema, file: {}, error: {e}", + file.file_id() + ), + } + .build() + })?; + if arrow_schema.has_json_extension_field() { + schemas.push(Arc::new(arrow_schema)); + } + } + Ok(schemas) + } } #[cfg(feature = "enterprise")] diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 432099dbcf..6bc796c85c 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -130,7 +130,7 @@ impl SeqScan { /// /// # Panics /// Panics if the compaction flag is not set. - pub async fn build_flat_reader_for_compaction(&self) -> Result { + pub(crate) async fn build_flat_reader_for_compaction(&self) -> Result { assert!(self.stream_ctx.input.compaction); let metrics_set = ExecutionPlanMetricsSet::new(); diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index c769f78c6c..d9f0c76506 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -91,6 +91,7 @@ pub struct FlatSchemaOptions { /// when storing primary key columns. /// Only takes effect when `raw_pk_columns` is true. pub string_pk_use_dict: bool, + pub override_schema: Option, } impl Default for FlatSchemaOptions { @@ -98,6 +99,7 @@ impl Default for FlatSchemaOptions { Self { raw_pk_columns: true, string_pk_use_dict: true, + override_schema: None, } } } @@ -111,6 +113,7 @@ impl FlatSchemaOptions { Self { raw_pk_columns: false, string_pk_use_dict: false, + override_schema: None, } } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index d8d1f91e3d..743fdd9c88 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -141,7 +141,7 @@ mod tests { use crate::cache::test_util::assert_parquet_metadata_equal; use crate::cache::{CacheManager, CacheStrategy, PageKey}; use crate::config::IndexConfig; - use crate::read::FlatSource; + use crate::read::RecordBatchSource; use crate::region::options::{IndexOptions, InvertedIndexOptions}; use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId}; use crate::sst::file_purger::NoopFilePurger; @@ -594,7 +594,7 @@ mod tests { let writer_props = props_builder.build(); - let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default()); + let write_format = FlatWriteFormat::new(metadata.schema.arrow_schema().clone()); let fields: Vec<_> = write_format .arrow_schema() .fields() @@ -1239,7 +1239,7 @@ mod tests { metadata: Arc, indexer_builder: IndexerBuilderImpl, file_path: RegionFilePathFactory, - flat_source: FlatSource, + source: RecordBatchSource, write_opts: &WriteOptions, ) -> SstInfo { let mut metrics = Metrics::new(WriteType::Flush); @@ -1254,7 +1254,7 @@ mod tests { .await; writer - .write_all_flat(flat_source, None, write_opts) + .write_all_flat(source, None, write_opts) .await .unwrap() .remove(0) diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 958dc0c8b2..3b720b61ba 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -49,7 +49,7 @@ use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu, - NewRecordBatchSnafu, Result, + NewRecordBatchSnafu, RecordBatchSnafu, Result, }; use crate::sst::parquet::format::{ FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, @@ -69,8 +69,7 @@ pub(crate) struct FlatWriteFormat { impl FlatWriteFormat { /// Creates a new helper. - pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat { - let arrow_schema = to_flat_sst_arrow_schema(&metadata, options); + pub(crate) fn new(arrow_schema: SchemaRef) -> FlatWriteFormat { FlatWriteFormat { arrow_schema, override_sequence: None, @@ -104,7 +103,12 @@ impl FlatWriteFormat { let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()])); columns[sequence_column_index(batch.num_columns())] = sequence_array; - RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu) + let columns = common_recordbatch::recordbatch::maybe_align_json_array_with_schema( + &self.arrow_schema, + columns, + ) + .context(RecordBatchSnafu)?; + RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu) } } diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index a5c5fa68ab..ba459e92de 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -1094,7 +1094,7 @@ mod tests { #[test] fn test_flat_to_sst_arrow_schema() { let metadata = build_test_region_metadata(); - let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default()); + let format = FlatWriteFormat::new(metadata.schema.arrow_schema().clone()); assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema()); } @@ -1114,7 +1114,7 @@ mod tests { #[test] fn test_flat_convert_batch() { let metadata = build_test_region_metadata(); - let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default()); + let format = FlatWriteFormat::new(metadata.schema.arrow_schema().clone()); let num_rows = 4; let columns: Vec = input_columns_for_flat_batch(num_rows); @@ -1128,7 +1128,7 @@ mod tests { #[test] fn test_flat_convert_with_override_sequence() { let metadata = build_test_region_metadata(); - let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default()) + let format = FlatWriteFormat::new(metadata.schema.arrow_schema().clone()) .with_override_sequence(Some(415411)); let num_rows = 4; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 5688133c46..d1432b3f4e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -446,8 +446,7 @@ impl ParquetReaderBuilder { .unwrap_or_else(|| region_meta.schema.clone()); // Create ArrowReaderMetadata for async stream building. - let arrow_reader_options = - ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone()); + let arrow_reader_options = ArrowReaderOptions::new(); let arrow_metadata = ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options) .context(ReadDataPartSnafu)?; diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 13005ff9fc..af7010563e 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -22,6 +22,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use std::time::Instant; +use arrow_schema::Schema; use common_telemetry::debug; use common_time::Timestamp; use datatypes::arrow::array::{ @@ -31,6 +32,8 @@ use datatypes::arrow::array::{ use datatypes::arrow::compute::{max, min}; use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::extension::json::is_json_extension_type; +use datatypes::schema::ext::ArrowSchemaExt; use object_store::{FuturesAsyncWriter, ObjectStore}; use parquet::arrow::AsyncArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; @@ -50,7 +53,7 @@ use crate::config::{IndexBuildMode, IndexConfig}; use crate::error::{ InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu, }; -use crate::read::FlatSource; +use crate::read::RecordBatchSource; use crate::sst::file::RegionFileId; use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder}; use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index}; @@ -58,6 +61,7 @@ use crate::sst::parquet::format::PrimaryKeyWriteFormat; use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions}; use crate::sst::{ DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator, + to_flat_sst_arrow_schema, }; /// Converts a flat RecordBatch for writing to parquet. @@ -267,16 +271,29 @@ where /// Returns the [SstInfo] if the SST is written. pub async fn write_all_flat( &mut self, - source: FlatSource, + source: RecordBatchSource, override_sequence: Option, opts: &WriteOptions, ) -> Result { + let options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding); + + let mut schema = to_flat_sst_arrow_schema(&self.metadata, &options); + if schema.has_json_extension_field() { + let mut fields = Vec::with_capacity(schema.fields().len()); + for field in schema.fields() { + if is_json_extension_type(field) + && let Some((_, override_field)) = source.schema().fields().find(field.name()) + { + fields.push(override_field.clone()); + } else { + fields.push(field.clone()); + } + } + schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())); + } + let converter = FlatBatchConverter::Flat( - FlatWriteFormat::new( - self.metadata.clone(), - &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), - ) - .with_override_sequence(override_sequence), + FlatWriteFormat::new(schema).with_override_sequence(override_sequence), ); let res = self.write_all_flat_inner(source, &converter, opts).await; if res.is_err() { @@ -293,7 +310,7 @@ where /// Returns the [SstInfo] if the SST is written. pub async fn write_all_flat_as_primary_key( &mut self, - source: FlatSource, + source: RecordBatchSource, override_sequence: Option, opts: &WriteOptions, ) -> Result { @@ -315,7 +332,7 @@ where async fn write_all_flat_inner( &mut self, - mut source: FlatSource, + mut source: RecordBatchSource, converter: &FlatBatchConverter, opts: &WriteOptions, ) -> Result { @@ -386,7 +403,7 @@ where async fn write_next_flat_batch( &mut self, - source: &mut FlatSource, + source: &mut RecordBatchSource, converter: &FlatBatchConverter, opts: &WriteOptions, ) -> Result> { diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 4e759f50cd..e045160a21 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -36,7 +36,7 @@ use store_api::metric_engine_consts::{ use store_api::storage::consts::ReservedColumnId; use store_api::storage::{FileId, RegionId}; -use crate::read::{Batch, FlatSource}; +use crate::read::{Batch, FlatSource, RecordBatchSource}; use crate::sst::file::{FileHandle, FileMeta}; use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; use crate::test_util::{new_batch_builder, new_noop_file_purger}; @@ -306,8 +306,11 @@ pub fn new_record_batch_with_custom_sequence( } /// Creates a FlatSource from flat format RecordBatches. -pub fn new_flat_source_from_record_batches(batches: Vec) -> FlatSource { - FlatSource::Iter(Box::new(batches.into_iter().map(Ok))) +pub(crate) fn new_flat_source_from_record_batches(batches: Vec) -> RecordBatchSource { + RecordBatchSource::new( + batches[0].schema(), + FlatSource::Iter(Box::new(batches.into_iter().map(Ok))), + ) } /// Creates a new region metadata for testing SSTs with binary datatype. diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 6fc78c59e5..2aa9af7978 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -15,6 +15,7 @@ //! Planner, QueryEngine implementations based on DataFusion. mod error; +mod json2_expr_planner; mod planner; use std::any::Any; diff --git a/src/query/src/datafusion/json2_expr_planner.rs b/src/query/src/datafusion/json2_expr_planner.rs new file mode 100644 index 0000000000..8ff5eabc85 --- /dev/null +++ b/src/query/src/datafusion/json2_expr_planner.rs @@ -0,0 +1,131 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_schema::Field; +use arrow_schema::extension::ExtensionType; +use common_function::scalars::json::json_get::JsonGetWithType; +use common_function::scalars::udf::create_udf; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_common::{Column, DataFusionError, Result, ScalarValue, TableReference}; +use datafusion_expr::expr::{BinaryExpr, ScalarFunction}; +use datafusion_expr::planner::{ExprPlanner, PlannerResult, RawBinaryExpr}; +use datafusion_expr::{Expr, ExprSchemable, Operator}; +use datatypes::extension::json::JsonExtensionType; +use sqlparser::ast::BinaryOperator; + +#[derive(Debug)] +pub(crate) struct Json2ExprPlanner; + +fn json_get(base: Expr, path: String) -> Result { + let args = vec![ + base, + Expr::Literal(ScalarValue::Utf8(Some(path)), None), + datatype_expr(&DataType::Utf8View)?, + ]; + let function = create_udf(Arc::new(JsonGetWithType::default())); + Ok(Expr::ScalarFunction(ScalarFunction::new_udf( + Arc::new(function), + args, + ))) +} + +impl ExprPlanner for Json2ExprPlanner { + fn plan_binary_op( + &self, + expr: RawBinaryExpr, + schema: &datafusion_common::DFSchema, + ) -> Result> { + let Some(operator) = parse_sql_binary_op(&expr.op) else { + return Ok(PlannerResult::Original(expr)); + }; + + let left_type = expr.left.get_type(schema)?; + let right_type = expr.right.get_type(schema)?; + let left_rewritten = rewrite_expr_json_get(&expr.left, right_type)?; + let right_rewritten = rewrite_expr_json_get(&expr.right, left_type)?; + if left_rewritten.is_none() && right_rewritten.is_none() { + return Ok(PlannerResult::Original(expr)); + } + + let rewritten = Expr::BinaryExpr(BinaryExpr::new( + Box::new(left_rewritten.unwrap_or(expr.left)), + operator, + Box::new(right_rewritten.unwrap_or(expr.right)), + )); + common_telemetry::debug!("json2 plan_binary_op: rewritten={rewritten:?}"); + Ok(PlannerResult::Planned(rewritten)) + } + + fn plan_compound_identifier( + &self, + field: &Field, + qualifier: Option<&TableReference>, + nested_names: &[String], + ) -> Result>> { + if field.extension_type_name() != Some(JsonExtensionType::NAME) { + return Ok(PlannerResult::Original(Vec::new())); + } + + let path = nested_names.join("."); + let column = Column::from((qualifier, field)); + json_get(Expr::Column(column), path).map(PlannerResult::Planned) + } +} + +fn rewrite_expr_json_get(expr: &Expr, data_type: DataType) -> Result> { + let Expr::ScalarFunction(func) = expr else { + return Ok(None); + }; + if func.func.name() != JsonGetWithType::NAME { + return Ok(None); + } + if func.args.len() != 3 { + return Err(DataFusionError::Internal(format!( + "Function {} is expected to have 3 arguments!", + func.name() + ))); + } + + let expected_expr = datatype_expr(&data_type)?; + let rewritten = Expr::ScalarFunction(ScalarFunction { + func: func.func.clone(), + args: vec![func.args[0].clone(), func.args[1].clone(), expected_expr], + }); + Ok(Some(rewritten)) +} + +fn parse_sql_binary_op(op: &BinaryOperator) -> Option { + match *op { + BinaryOperator::Gt => Some(Operator::Gt), + BinaryOperator::GtEq => Some(Operator::GtEq), + BinaryOperator::Lt => Some(Operator::Lt), + BinaryOperator::LtEq => Some(Operator::LtEq), + BinaryOperator::Eq => Some(Operator::Eq), + BinaryOperator::NotEq => Some(Operator::NotEq), + BinaryOperator::Plus => Some(Operator::Plus), + BinaryOperator::Minus => Some(Operator::Minus), + BinaryOperator::Multiply => Some(Operator::Multiply), + BinaryOperator::Divide => Some(Operator::Divide), + BinaryOperator::Modulo => Some(Operator::Modulo), + BinaryOperator::And => Some(Operator::And), + BinaryOperator::Or => Some(Operator::Or), + _ => None, + } +} + +fn datatype_expr(data_type: &DataType) -> Result { + ScalarValue::try_new_null(data_type).map(|x| Expr::Literal(x, None)) +} diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index d9c74b9d5a..7088111774 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -38,6 +38,7 @@ use datafusion_sql::parser::Statement as DfStatement; use session::context::QueryContextRef; use snafu::{Location, ResultExt}; +use crate::datafusion::json2_expr_planner::Json2ExprPlanner; use crate::error::{CatalogSnafu, Result}; use crate::query_engine::{DefaultPlanDecoder, QueryEngineState}; @@ -87,6 +88,9 @@ impl DfContextProviderAdapter { .map(|format| (format.get_ext().to_lowercase(), format)) .collect(); + let mut expr_planners = SessionStateDefaults::default_expr_planners(); + expr_planners.insert(0, Arc::new(Json2ExprPlanner)); + Ok(Self { engine_state, session_state, @@ -94,7 +98,7 @@ impl DfContextProviderAdapter { table_provider, query_ctx, file_formats, - expr_planners: SessionStateDefaults::default_expr_planners(), + expr_planners, }) } } diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 15001a81fa..b68527a188 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -15,6 +15,7 @@ //! Dummy catalog for region server. use std::any::Any; +use std::collections::HashMap; use std::fmt; use std::sync::{Arc, Mutex}; @@ -30,6 +31,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_common::DataFusionError; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; use datatypes::arrow::datatypes::SchemaRef; +use datatypes::data_type::ConcreteDataType; use futures::stream::BoxStream; use session::context::{QueryContext, QueryContextRef}; use snafu::ResultExt; @@ -282,6 +284,10 @@ impl DummyTableProvider { self.scan_request.lock().unwrap().vector_search.clone() } + pub fn with_json2_type_hint(&self, json2_column_types: &HashMap) { + self.scan_request.lock().unwrap().json2_column_types = json2_column_types.clone(); + } + pub fn with_sequence(&self, sequence: u64) { self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence); } diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index aaac1e3124..6deeee1a4a 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -15,6 +15,7 @@ pub mod constant_term; pub mod count_nest_aggr; pub mod count_wildcard; +pub mod json2_scan_hint; pub mod parallelize_scan; pub mod pass_distribution; pub mod remove_duplicate; diff --git a/src/query/src/optimizer/json2_scan_hint.rs b/src/query/src/optimizer/json2_scan_hint.rs new file mode 100644 index 0000000000..df16ff0477 --- /dev/null +++ b/src/query/src/optimizer/json2_scan_hint.rs @@ -0,0 +1,225 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_function::scalars::json::json_get::JsonGetWithType; +use datafusion::datasource::DefaultTableSource; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{Result, ScalarValue, TableReference, internal_err}; +use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::{Expr, LogicalPlan}; +use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; +use datatypes::data_type::ConcreteDataType; +use datatypes::json::requirement::JsonPathTarget; +use datatypes::types::JsonFormat; + +use crate::dummy_catalog::DummyTableProvider; + +#[derive(Debug)] +pub struct Json2ScanHintRule; + +impl OptimizerRule for Json2ScanHintRule { + fn name(&self) -> &str { + "Json2ScanHintRule" + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + let requirements = Json2TypeRequirements::collect(&plan)?; + if requirements.is_empty() { + return Ok(Transformed::no(plan)); + } + + plan.transform_down(&mut |plan| match &plan { + LogicalPlan::TableScan(table_scan) => { + let Some(source) = table_scan + .source + .as_any() + .downcast_ref::() + else { + return Ok(Transformed::no(plan)); + }; + + let Some(adapter) = source + .table_provider + .as_any() + .downcast_ref::() + else { + return Ok(Transformed::no(plan)); + }; + + let hints = + requirements.merge(&table_scan.table_name, &adapter.region_metadata().schema); + adapter.with_json2_type_hint(&hints); + Ok(Transformed::yes(plan)) + } + _ => Ok(Transformed::no(plan)), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct Json2ColumnKey { + relation: Option, + name: String, +} + +#[derive(Debug, Default)] +struct Json2TypeRequirements { + path_targets: HashMap, +} + +impl Json2TypeRequirements { + fn collect(plan: &LogicalPlan) -> Result { + let mut collector = Self::default(); + plan.apply(|node| { + for expr in node.expressions() { + let _ = expr.apply(|expr| { + if let Some((column, path, data_type)) = extract_json_get(expr)? { + collector + .path_targets + .entry(column) + .or_default() + .require_typed_path(&path, data_type); + } + Ok(TreeNodeRecursion::Continue) + })?; + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(collector) + } + + fn is_empty(&self) -> bool { + self.path_targets.is_empty() + } + + fn merge( + &self, + table_name: &TableReference, + schema: &datatypes::schema::SchemaRef, + ) -> HashMap { + let mut types = HashMap::new(); + + for column_schema in schema.column_schemas() { + let ConcreteDataType::Json(json_type) = &column_schema.data_type else { + continue; + }; + if !matches!(json_type.format, JsonFormat::Json2(_)) { + continue; + } + + let matching_keys = self + .path_targets + .iter() + .filter(|(key, _)| { + key.name == column_schema.name + && key.relation.as_ref().is_none_or(|x| x == table_name) + }) + .map(|(_, target)| target.clone()) + .collect::>(); + if matching_keys.is_empty() { + continue; + } + + let mut merged = JsonPathTarget::default(); + for target in matching_keys { + if let Some(data_type) = target.build_type() { + merge_path_target_from_type(&mut merged, &data_type, ""); + } + } + if let Some(data_type) = merged.build_type() { + let _ = types.insert(column_schema.name.clone(), data_type); + } + } + + types + } +} + +fn extract_json_get(expr: &Expr) -> Result> { + let Expr::ScalarFunction(ScalarFunction { func, args }) = expr else { + return Ok(None); + }; + if func.name() != JsonGetWithType::NAME { + return Ok(None); + } + if args.len() != 3 { + return internal_err!("function {} must have 3 arguments", JsonGetWithType::NAME); + } + + let Expr::Column(column) = &args[0] else { + return Ok(None); + }; + + let path = match &args[1] { + Expr::Literal(ScalarValue::Utf8(Some(path)), _) + | Expr::Literal(ScalarValue::LargeUtf8(Some(path)), _) + | Expr::Literal(ScalarValue::Utf8View(Some(path)), _) => path.clone(), + _ => return Ok(None), + }; + + let data_type = args + .get(2) + .and_then(extract_expected_type) + .unwrap_or_else(ConcreteDataType::string_datatype); + + Ok(Some(( + Json2ColumnKey { + relation: column.relation.clone(), + name: column.name.clone(), + }, + path, + data_type, + ))) +} + +fn extract_expected_type(expr: &Expr) -> Option { + match expr { + Expr::Literal(value, _) => { + let data_type = value.data_type(); + Some(ConcreteDataType::from_arrow_type(&data_type)) + } + _ => None, + } +} + +fn merge_path_target_from_type( + target: &mut JsonPathTarget, + data_type: &ConcreteDataType, + prefix: &str, +) { + match data_type { + ConcreteDataType::Struct(struct_type) => { + let fields = struct_type.fields(); + for field in fields.iter() { + let path = if prefix.is_empty() { + field.name().to_string() + } else { + format!("{prefix}.{}", field.name()) + }; + merge_path_target_from_type(target, field.data_type(), &path); + } + } + _ => { + if !prefix.is_empty() { + target.require_typed_path(prefix, data_type.clone()); + } + } + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f696c8b53e..e27006f224 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -62,6 +62,7 @@ use crate::optimizer::ExtensionAnalyzerRule; use crate::optimizer::constant_term::MatchesConstantTermOptimizer; use crate::optimizer::count_nest_aggr::CountNestAggrRule; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; +use crate::optimizer::json2_scan_hint::Json2ScanHintRule; use crate::optimizer::parallelize_scan::ParallelizeScan; use crate::optimizer::pass_distribution::PassDistribution; use crate::optimizer::remove_duplicate::RemoveDuplicate; @@ -174,6 +175,7 @@ impl QueryEngineState { let mut optimizer = Optimizer::new(); optimizer.rules.push(Arc::new(ScanHintRule)); + optimizer.rules.push(Arc::new(Json2ScanHintRule)); // add physical optimizer let mut physical_optimizer = PhysicalOptimizer::new(); diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 9d2f1f2006..cccb19ced2 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::{Display, Formatter}; use common_error::ext::BoxedError; use common_recordbatch::OrderOption; use datafusion_expr::expr::Expr; // Re-export vector types from datatypes to avoid duplication +use datatypes::data_type::ConcreteDataType; pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType}; use strum::Display; @@ -128,6 +130,8 @@ pub struct ScanRequest { /// Optional hint for KNN vector search. When set, the scan should use /// vector index to find the k nearest neighbors. pub vector_search: Option, + /// Optional target types for query-driven JSON2 concretization. + pub json2_column_types: HashMap, } impl ScanRequest { @@ -227,6 +231,14 @@ impl Display for ScanRequest { vector_search.metric )?; } + if !self.json2_column_types.is_empty() { + write!( + f, + "{}json2_column_types: {:?}", + delimiter.as_str(), + self.json2_column_types + )?; + } write!(f, " }}") } } diff --git a/tests-integration/tests/jsonbench.rs b/tests-integration/tests/jsonbench.rs index 55cfcd53f0..02692e7e82 100644 --- a/tests-integration/tests/jsonbench.rs +++ b/tests-integration/tests/jsonbench.rs @@ -153,16 +153,10 @@ async fn query_data(frontend: &Arc) -> io::Result<()> { +----------+"#; execute_sql_and_expect(frontend, sql, expected).await; - let sql = "SELECT * FROM bluesky ORDER BY time_us"; - let expected = fs::read_to_string(find_workspace_path( - "tests-integration/resources/jsonbench-select-all.txt", - ))?; - execute_sql_and_expect(frontend, sql, &expected).await; - // query 1: let sql = " SELECT - json_get_string(data, '$.commit.collection') AS event, count() AS count + data.commit.collection AS event, count() AS count FROM bluesky GROUP BY event ORDER BY count DESC, event ASC"; @@ -180,13 +174,12 @@ ORDER BY count DESC, event ASC"; // query 2: let sql = " SELECT - json_get_string(data, '$.commit.collection') AS event, + data.commit.collection AS event, count() AS count, - count(DISTINCT json_get_string(data, '$.did')) AS users + count(DISTINCT data.did) AS users FROM bluesky WHERE - (json_get_string(data, '$.kind') = 'commit') AND - (json_get_string(data, '$.commit.operation') = 'create') + data.kind = 'commit' AND data.commit.operation = 'create' GROUP BY event ORDER BY count DESC, event ASC"; let expected = r#" @@ -203,15 +196,14 @@ ORDER BY count DESC, event ASC"; // query 3: let sql = " SELECT - json_get_string(data, '$.commit.collection') AS event, - date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day, + data.commit.collection AS event, + date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day, count() AS count FROM bluesky WHERE - (json_get_string(data, '$.kind') = 'commit') AND - (json_get_string(data, '$.commit.operation') = 'create') AND - json_get_string(data, '$.commit.collection') IN - ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') + data.kind = 'commit' AND + data.commit.operation = 'create' AND + data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') GROUP BY event, hour_of_day ORDER BY hour_of_day, event"; let expected = r#" @@ -227,13 +219,13 @@ ORDER BY hour_of_day, event"; // query 4: let sql = " SELECT - json_get_string(data, '$.did') as user_id, - min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts + data.did::String as user_id, + min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts FROM bluesky WHERE - (json_get_string(data, '$.kind') = 'commit') AND - (json_get_string(data, '$.commit.operation') = 'create') AND - (json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post') + data.kind = 'commit' AND + data.commit.operation = 'create' AND + data.commit.collection = 'app.bsky.feed.post' GROUP BY user_id ORDER BY first_post_ts ASC, user_id DESC LIMIT 3"; @@ -250,17 +242,17 @@ LIMIT 3"; // query 5: let sql = " SELECT - json_get_string(data, '$.did') as user_id, + data.did::String as user_id, date_part( 'epoch', - max(to_timestamp_micros(json_get_int(data, '$.time_us'))) - - min(to_timestamp_micros(json_get_int(data, '$.time_us'))) + max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) - + min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) ) AS activity_span FROM bluesky WHERE - (json_get_string(data, '$.kind') = 'commit') AND - (json_get_string(data, '$.commit.operation') = 'create') AND - (json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post') + data.kind = 'commit' AND + data.commit.operation = 'create' AND + data.commit.collection = 'app.bsky.feed.post' GROUP BY user_id ORDER BY activity_span DESC, user_id DESC LIMIT 3"; @@ -304,30 +296,21 @@ async fn insert_data_by_sql(frontend: &Arc) -> io::Result<()> { async fn desc_table(frontend: &Arc) { let sql = "DESC TABLE bluesky"; let expected = r#" -+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| data | Json<{"_raw":"","commit.collection":"","commit.operation":"","did":"","kind":"","time_us":""}> | | YES | | FIELD | -| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP | -+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#; ++---------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++---------+----------------------+-----+------+---------+---------------+ +| data | JSON2 | | YES | | FIELD | +| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP | ++---------+----------------------+-----+------+---------+---------------+"#; execute_sql_and_expect(frontend, sql, expected).await; } async fn create_table(frontend: &Arc) { let sql = r#" CREATE TABLE bluesky ( - "data" JSON ( - format = "partial", - fields = Struct< - kind String, - "commit.operation" String, - "commit.collection" String, - did String, - time_us Bigint - >, - ), + "data" JSON2, time_us TimestampMicrosecond TIME INDEX, -) +) WITH ('append_mode' = 'true', 'sst_format' = 'flat') "#; execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await; } diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index 1e4cf18b40..d43f3a757f 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -131,6 +131,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; |_|_TableScan: test_| |_| ]]_| | logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| +| logical_plan after Json2ScanHintRule_| SAME TEXT AS ABOVE_| | logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_| | logical_plan after optimize_unions_| SAME TEXT AS ABOVE_| | logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_| @@ -156,6 +157,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| | logical_plan after optimize_projections_| SAME TEXT AS ABOVE_| | logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| +| logical_plan after Json2ScanHintRule_| SAME TEXT AS ABOVE_| | logical_plan_| MergeScan [is_placeholder=false, remote_input=[_| |_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_| |_|_PromSeriesDivide: tags=["k"]_| @@ -276,6 +278,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; |_|_TableScan: test_| |_| ]]_| | logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| +| logical_plan after Json2ScanHintRule_| SAME TEXT AS ABOVE_| | logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_| | logical_plan after optimize_unions_| SAME TEXT AS ABOVE_| | logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_| @@ -301,6 +304,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series; | logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_| | logical_plan after optimize_projections_| SAME TEXT AS ABOVE_| | logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_| +| logical_plan after Json2ScanHintRule_| SAME TEXT AS ABOVE_| | logical_plan_| MergeScan [is_placeholder=false, remote_input=[_| |_| Projection: test.i AS series, test.k, test.j_| |_|_PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_| diff --git a/tests/cases/standalone/common/types/json/json2.result b/tests/cases/standalone/common/types/json/json2.result index e13e2307e1..45d218c00c 100644 --- a/tests/cases/standalone/common/types/json/json2.result +++ b/tests/cases/standalone/common/types/json/json2.result @@ -42,6 +42,14 @@ admin flush_table('json2_table'); | 0 | +----------------------------------+ +admin compact_table('json2_table', 'swcs', '86400'); + ++-----------------------------------------------------+ +| ADMIN compact_table('json2_table', 'swcs', '86400') | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'), (8, '{"a": {"b": 8}, "c": "s8"}'); @@ -62,6 +70,112 @@ values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'), Affected Rows: 2 +select j.a.b from json2_table order by ts; + ++----------------------------------------------------+ +| json_get(json2_table.j,Utf8("a.b"),Utf8View(NULL)) | ++----------------------------------------------------+ +| 1 | +| -2 | +| 3 | +| -4 | +| | +| | +| s7 | +| 8 | +| | +| 10 | ++----------------------------------------------------+ + +select j.a, j.a.x from json2_table order by ts; + ++--------------------------------------------------+----------------------------------------------------+ +| json_get(json2_table.j,Utf8("a"),Utf8View(NULL)) | json_get(json2_table.j,Utf8("a.x"),Utf8View(NULL)) | ++--------------------------------------------------+----------------------------------------------------+ +| {"b":1,"x":null} | | +| {"b":-2,"x":null} | | +| {"b":3,"x":null} | | +| {"b":-4,"x":null} | | +| {"b":null,"x":null} | | +| | | +| {"b":"s7","x":null} | | +| {"b":8,"x":null} | | +| {"b":null,"x":true} | true | +| {"b":10,"x":null} | | ++--------------------------------------------------+----------------------------------------------------+ + +select j.c, j.y from json2_table order by ts; + ++--------------------------------------------------+--------------------------------------------------+ +| json_get(json2_table.j,Utf8("c"),Utf8View(NULL)) | json_get(json2_table.j,Utf8("y"),Utf8View(NULL)) | ++--------------------------------------------------+--------------------------------------------------+ +| s1 | | +| s2 | | +| s3 | | +| | | +| s5 | | +| s6 | | +| [1] | | +| s8 | | +| s9 | | +| | false | ++--------------------------------------------------+--------------------------------------------------+ + +select j from json2_table order by ts; + +Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 0 + +select * from json2_table order by ts; + +Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 1 + +select j.a.b + 1 from json2_table order by ts; + ++------------------------------------------------------------+ +| json_get(json2_table.j,Utf8("a.b"),Int64(NULL)) + Int64(1) | ++------------------------------------------------------------+ +| 2 | +| -1 | +| 4 | +| -3 | +| | +| | +| | +| 9 | +| | +| 11 | ++------------------------------------------------------------+ + +select abs(j.a.b) from json2_table order by ts; + +Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts. + Candidate functions: + abs(Numeric(1)) + +-- "j.c" is of type "String", "abs" is expected to be all "null"s. +select abs(j.c) from json2_table order by ts; + +Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts. + Candidate functions: + abs(Numeric(1)) + +select j.d from json2_table order by ts; + ++--------------------------------------------------+ +| json_get(json2_table.j,Utf8("d"),Utf8View(NULL)) | ++--------------------------------------------------+ +| [{"e":{"f":0.1,"g":null}}] | +| [{"e":{"f":0.2,"g":null}}] | +| | +| [{"e":{"f":null,"g":-0.4}}] | +| | +| | +| [{"e":{"g":-0.7}}] | +| | +| [{"e":{"g":-0.9}}] | +| | ++--------------------------------------------------+ + drop table json2_table; Affected Rows: 0 diff --git a/tests/cases/standalone/common/types/json/json2.sql b/tests/cases/standalone/common/types/json/json2.sql index 3644f06be4..c7c32fe94b 100644 --- a/tests/cases/standalone/common/types/json/json2.sql +++ b/tests/cases/standalone/common/types/json/json2.sql @@ -22,6 +22,8 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'), admin flush_table('json2_table'); +admin compact_table('json2_table', 'swcs', '86400'); + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'), (8, '{"a": {"b": 8}, "c": "s8"}'); @@ -32,4 +34,23 @@ insert into json2_table values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'), (10, '{"a": {"b": 10}, "y": false}'); +select j.a.b from json2_table order by ts; + +select j.a, j.a.x from json2_table order by ts; + +select j.c, j.y from json2_table order by ts; + +select j from json2_table order by ts; + +select * from json2_table order by ts; + +select j.a.b + 1 from json2_table order by ts; + +select abs(j.a.b) from json2_table order by ts; + +-- "j.c" is of type "String", "abs" is expected to be all "null"s. +select abs(j.c) from json2_table order by ts; + +select j.d from json2_table order by ts; + drop table json2_table; diff --git a/tests/cases/standalone/common/types/json/jsonbench.result b/tests/cases/standalone/common/types/json/jsonbench.result new file mode 100644 index 0000000000..3de4caa1b2 --- /dev/null +++ b/tests/cases/standalone/common/types/json/jsonbench.result @@ -0,0 +1,180 @@ +CREATE TABLE bluesky ( + `data` JSON2, + time_us TimestampMicrosecond TIME INDEX +) WITH ('append_mode' = 'true', 'sst_format' = 'flat'); + +Affected Rows: 0 + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349000167, + '{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah.  LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}'); + +Affected Rows: 1 + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349000644, + '{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}'); + +Affected Rows: 1 + +ADMIN flush_table('bluesky'); + ++------------------------------+ +| ADMIN flush_table('bluesky') | ++------------------------------+ +| 0 | ++------------------------------+ + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349001108, + '{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}'); + +Affected Rows: 1 + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349001372, + '{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}'); + +Affected Rows: 1 + +ADMIN flush_table('bluesky'); + ++------------------------------+ +| ADMIN flush_table('bluesky') | ++------------------------------+ +| 0 | ++------------------------------+ + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349001905, + '{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}'); + +Affected Rows: 1 + +ADMIN compact_table('bluesky', 'swcs', '86400'); + ++-------------------------------------------------+ +| ADMIN compact_table('bluesky', 'swcs', '86400') | ++-------------------------------------------------+ +| 0 | ++-------------------------------------------------+ + +SELECT count(*) FROM bluesky; + ++----------+ +| count(*) | ++----------+ +| 5 | ++----------+ + +-- Query 1: +SELECT data.commit.collection AS event, + count() AS count +FROM bluesky +GROUP BY event +ORDER BY count DESC, event ASC; + ++-----------------------+-------+ +| event | count | ++-----------------------+-------+ +| app.bsky.feed.like | 2 | +| app.bsky.feed.post | 2 | +| app.bsky.graph.follow | 1 | ++-----------------------+-------+ + +-- Query 2: +SELECT data.commit.collection AS event, + count() AS count, + count(DISTINCT data.did) AS users +FROM bluesky +WHERE data.kind = 'commit' AND data.commit.operation = 'create' +GROUP BY event +ORDER BY count DESC, event ASC; + ++-----------------------+-------+-------+ +| event | count | users | ++-----------------------+-------+-------+ +| app.bsky.feed.like | 2 | 2 | +| app.bsky.feed.post | 2 | 2 | +| app.bsky.graph.follow | 1 | 1 | ++-----------------------+-------+-------+ + +-- Query 3: +SELECT data.commit.collection AS event, + date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day, + count() AS count +FROM bluesky +WHERE data.kind = 'commit' + AND data.commit.operation = 'create' + AND data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') +GROUP BY event, hour_of_day +ORDER BY hour_of_day, event; + ++--------------------+-------------+-------+ +| event | hour_of_day | count | ++--------------------+-------------+-------+ +| app.bsky.feed.like | 16 | 2 | +| app.bsky.feed.post | 16 | 2 | ++--------------------+-------------+-------+ + +-- Query 4: +SELECT data.did::String as user_id, + min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts +FROM bluesky +WHERE data.kind = 'commit' + AND data.commit.operation = 'create' + AND data.commit.collection = 'app.bsky.feed.post' +GROUP BY user_id +ORDER BY first_post_ts ASC, user_id DESC +LIMIT 3; + ++----------------------------------+----------------------------+ +| user_id | first_post_ts | ++----------------------------------+----------------------------+ +| did:plc:yj3sjq3blzpynh27cumnp5ks | 2024-11-21T16:25:49.000167 | +| did:plc:l5o3qjrmfztir54cpwlv2eme | 2024-11-21T16:25:49.001905 | ++----------------------------------+----------------------------+ + +-- Query 5: +SELECT data.did::String as user_id, + date_part( + 'epoch', + max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) - + min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) + ) AS activity_span +FROM bluesky +WHERE data.kind = 'commit' + AND data.commit.operation = 'create' + AND data.commit.collection = 'app.bsky.feed.post' +GROUP BY user_id +ORDER BY activity_span DESC, user_id DESC +LIMIT 3; + ++----------------------------------+---------------+ +| user_id | activity_span | ++----------------------------------+---------------+ +| did:plc:yj3sjq3blzpynh27cumnp5ks | 0.0 | +| did:plc:l5o3qjrmfztir54cpwlv2eme | 0.0 | ++----------------------------------+---------------+ + +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN +SELECT date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day +FROM bluesky; + ++---------------+--------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: date_part(Utf8("hour"), to_timestamp_micros(json2_get(bluesky.data, Utf8("time_us"), Int64(NULL)))) AS hour_of_day | +| | TableScan: bluesky | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------+ + +DROP TABLE bluesky; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/types/json/jsonbench.sql b/tests/cases/standalone/common/types/json/jsonbench.sql new file mode 100644 index 0000000000..8d25605ded --- /dev/null +++ b/tests/cases/standalone/common/types/json/jsonbench.sql @@ -0,0 +1,92 @@ +CREATE TABLE bluesky ( + `data` JSON2, + time_us TimestampMicrosecond TIME INDEX +) WITH ('append_mode' = 'true', 'sst_format' = 'flat'); + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349000167, + '{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah.  LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}'); + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349000644, + '{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}'); + +ADMIN flush_table('bluesky'); + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349001108, + '{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}'); + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349001372, + '{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}'); + +ADMIN flush_table('bluesky'); + +INSERT INTO bluesky (time_us, data) +VALUES (1732206349001905, + '{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}'); + +ADMIN compact_table('bluesky', 'swcs', '86400'); + +SELECT count(*) FROM bluesky; + +-- Query 1: +SELECT data.commit.collection AS event, + count() AS count +FROM bluesky +GROUP BY event +ORDER BY count DESC, event ASC; + +-- Query 2: +SELECT data.commit.collection AS event, + count() AS count, + count(DISTINCT data.did) AS users +FROM bluesky +WHERE data.kind = 'commit' AND data.commit.operation = 'create' +GROUP BY event +ORDER BY count DESC, event ASC; + +-- Query 3: +SELECT data.commit.collection AS event, + date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day, + count() AS count +FROM bluesky +WHERE data.kind = 'commit' + AND data.commit.operation = 'create' + AND data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') +GROUP BY event, hour_of_day +ORDER BY hour_of_day, event; + +-- Query 4: +SELECT data.did::String as user_id, + min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts +FROM bluesky +WHERE data.kind = 'commit' + AND data.commit.operation = 'create' + AND data.commit.collection = 'app.bsky.feed.post' +GROUP BY user_id +ORDER BY first_post_ts ASC, user_id DESC +LIMIT 3; + +-- Query 5: +SELECT data.did::String as user_id, + date_part( + 'epoch', + max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) - + min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) + ) AS activity_span +FROM bluesky +WHERE data.kind = 'commit' + AND data.commit.operation = 'create' + AND data.commit.collection = 'app.bsky.feed.post' +GROUP BY user_id +ORDER BY activity_span DESC, user_id DESC +LIMIT 3; + +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN +SELECT date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day +FROM bluesky; + +DROP TABLE bluesky;