From 016d5df963ce854359bbc79eb53e78af8019a56a Mon Sep 17 00:00:00 2001 From: luofucong Date: Wed, 11 Mar 2026 19:54:24 +0800 Subject: [PATCH] query driven concretize json2 datatype Signed-off-by: luofucong --- src/datatypes/src/json.rs | 1 + src/datatypes/src/json/requirement.rs | 71 ++++++ src/datatypes/src/vectors/json/array.rs | 51 ++-- src/mito2/src/read/scan_region.rs | 60 ++++- src/query/src/dummy_catalog.rs | 6 + src/query/src/optimizer.rs | 1 + src/query/src/optimizer/json2_scan_hint.rs | 237 ++++++++++++++++++ src/query/src/query_engine/state.rs | 2 + src/store-api/src/storage/requests.rs | 12 + .../standalone/common/types/json/json2.result | 34 +++ .../standalone/common/types/json/json2.sql | 4 + 11 files changed, 450 insertions(+), 29 deletions(-) create mode 100644 src/datatypes/src/json/requirement.rs create mode 100644 src/query/src/optimizer/json2_scan_hint.rs diff --git a/src/datatypes/src/json.rs b/src/datatypes/src/json.rs index 3bebbf89aa..b2a277098d 100644 --- a/src/datatypes/src/json.rs +++ b/src/datatypes/src/json.rs @@ -19,6 +19,7 @@ //! The struct will carry all the fields of the Json object. We will not flatten any json object in this implementation. //! +pub mod requirement; pub mod value; use std::collections::{BTreeMap, HashSet}; diff --git a/src/datatypes/src/json/requirement.rs b/src/datatypes/src/json/requirement.rs new file mode 100644 index 0000000000..0faa3f191e --- /dev/null +++ b/src/datatypes/src/json/requirement.rs @@ -0,0 +1,71 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use crate::data_type::ConcreteDataType; +use crate::types::{StructField, StructType}; + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct JsonPathTarget { + root: JsonPathTargetNode, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +struct JsonPathTargetNode { + children: BTreeMap, + string_leaf: bool, +} + +impl JsonPathTarget { + pub fn require_path(&mut self, path: &str) { + let mut current = &mut self.root; + for segment in path.split('.') { + current = current.children.entry(segment.to_string()).or_default(); + } + current.string_leaf = true; + } + + pub fn is_empty(&self) -> bool { + self.root.children.is_empty() && !self.root.string_leaf + } + + pub fn build_type(&self) -> Option { + if self.is_empty() { + None + } else { + Some(ConcreteDataType::Struct(self.root.build_struct_type())) + } + } +} + +impl JsonPathTargetNode { + fn build_data_type(&self) -> ConcreteDataType { + if self.children.is_empty() { + ConcreteDataType::string_datatype() + } else { + ConcreteDataType::Struct(self.build_struct_type()) + } + } + + fn build_struct_type(&self) -> StructType { + let fields = self + .children + .iter() + .map(|(name, child)| StructField::new(name.clone(), child.build_data_type(), true)) + .collect::>(); + StructType::new(Arc::new(fields)) + } +} diff --git a/src/datatypes/src/vectors/json/array.rs b/src/datatypes/src/vectors/json/array.rs index efd350485f..872235213b 100644 --- a/src/datatypes/src/vectors/json/array.rs +++ b/src/datatypes/src/vectors/json/array.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::sync::Arc; use arrow::compute; @@ -19,7 +20,7 @@ use arrow::util::display::{ArrayFormatter, FormatOptions}; use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, StructArray, new_null_array}; use arrow_schema::DataType; -use snafu::{ResultExt, ensure}; +use snafu::ResultExt; use crate::arrow_array::StringArray; use crate::error::{AlignJsonArraySnafu, ArrowComputeSnafu, Result}; @@ -64,43 +65,37 @@ impl JsonArray<'_> { while i < expect_fields.len() && j < array_fields.len() { let expect_field = &expect_fields[i]; let array_field = &array_fields[j]; - if expect_field.name() == array_field.name() { - if expect_field.data_type() == array_field.data_type() { - aligned.push(array_columns[j].clone()); - } else { - let array = JsonArray::from(&array_columns[j]); - if matches!(expect_field.data_type(), DataType::Struct(_)) { - // A `StructArray` in a JSON array must be another JSON array. - // (Like a nested JSON object in a JSON value.) - aligned.push(array.try_align(expect_field.data_type())?); + match expect_field.name().cmp(array_field.name()) { + Ordering::Equal => { + if expect_field.data_type() == array_field.data_type() { + aligned.push(array_columns[j].clone()); } else { - aligned.push(array.try_cast(expect_field.data_type())?); + let array = JsonArray::from(&array_columns[j]); + if matches!(expect_field.data_type(), DataType::Struct(_)) { + // A `StructArray` in a JSON array must be another JSON array. + // (Like a nested JSON object in a JSON value.) + aligned.push(array.try_align(expect_field.data_type())?); + } else { + aligned.push(array.try_cast(expect_field.data_type())?); + } } + i += 1; + j += 1; + } + Ordering::Less => { + aligned.push(new_null_array(expect_field.data_type(), struct_array.len())); + i += 1; + } + Ordering::Greater => { + j += 1; } - - j += 1; - } else { - aligned.push(new_null_array(expect_field.data_type(), struct_array.len())); } - i += 1; } if i < expect_fields.len() { for field in &expect_fields[i..] { aligned.push(new_null_array(field.data_type(), struct_array.len())); } } - ensure!( - j == array_fields.len(), - AlignJsonArraySnafu { - reason: format!( - "this json array has more fields {:?}", - array_fields[j..] - .iter() - .map(|x| x.name()) - .collect::>(), - ) - } - ); let json_array = StructArray::try_new( expect_fields.clone(), diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 60a667c57e..ed12c9a0e5 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -15,7 +15,7 @@ //! Scans a region according to the scan request. use std::borrow::Cow; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::num::NonZeroU64; use std::sync::Arc; @@ -556,6 +556,7 @@ impl ScanRegion { .with_merge_mode(self.version.options.merge_mode()) .with_series_row_selector(self.request.series_row_selector) .with_distribution(self.request.distribution) + .with_json2_column_types(self.request.json2_column_types.clone()) .with_flat_format(flat_format); #[cfg(feature = "vector_index")] let input = input @@ -801,6 +802,51 @@ impl ScanRegion { } fn maybe_concretize_flat_json2_schema(input: ScanInput) -> ScanInput { + apply_query_driven_json2_schema(input) + // apply_data_driven_json2_schema(input) +} + +fn apply_query_driven_json2_schema(input: ScanInput) -> ScanInput { + let Some(flat_mapper) = input.mapper.as_flat() else { + return input; + }; + if input.json2_column_types.is_empty() { + return input; + } + + let output_schema = flat_mapper.output_schema(); + let output_arrow_schema = output_schema.arrow_schema(); + let mut column_schemas = output_schema.column_schemas().to_vec(); + let mut changed = false; + for (idx, column_schema) in column_schemas.iter_mut().enumerate() { + let output_field = &output_arrow_schema.fields()[idx]; + if !is_json_extension_type(output_field) { + continue; + } + + let Some(target_type) = input.json2_column_types.get(&column_schema.name) else { + continue; + }; + if &column_schema.data_type != target_type { + column_schema.data_type = target_type.clone(); + changed = true; + } + } + + if !changed { + return input; + } + + let mut mapper = Arc::unwrap_or_clone(input.mapper); + mapper.with_flat_output_schema(Arc::new(Schema::new(column_schemas))); + ScanInput { + mapper: Arc::new(mapper), + ..input + } +} + +#[expect(unused)] +fn apply_data_driven_json2_schema(input: ScanInput) -> ScanInput { let Some(flat_mapper) = input.mapper.as_flat() else { return input; }; @@ -914,6 +960,8 @@ pub struct ScanInput { pub(crate) series_row_selector: Option, /// Hint for the required distribution of the scanner. pub(crate) distribution: Option, + /// Query-driven target types for JSON2 columns. + json2_column_types: HashMap, /// Whether to use flat format. pub(crate) flat_format: bool, /// Whether this scan is for compaction. @@ -952,6 +1000,7 @@ impl ScanInput { merge_mode: MergeMode::default(), series_row_selector: None, distribution: None, + json2_column_types: HashMap::new(), flat_format: false, compaction: false, #[cfg(feature = "enterprise")] @@ -988,6 +1037,15 @@ impl ScanInput { self } + #[must_use] + fn with_json2_column_types( + mut self, + json2_column_types: HashMap, + ) -> Self { + self.json2_column_types = json2_column_types; + self + } + /// Sets cache for this query. #[must_use] pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self { diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 239cf7cea8..104462a5db 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -15,6 +15,7 @@ //! Dummy catalog for region server. use std::any::Any; +use std::collections::HashMap; use std::fmt; use std::sync::{Arc, Mutex}; @@ -30,6 +31,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_common::DataFusionError; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; use datatypes::arrow::datatypes::SchemaRef; +use datatypes::data_type::ConcreteDataType; use futures::stream::BoxStream; use session::context::{QueryContext, QueryContextRef}; use snafu::ResultExt; @@ -266,6 +268,10 @@ impl DummyTableProvider { self.scan_request.lock().unwrap().vector_search.clone() } + pub fn with_json2_type_hint(&self, json2_column_types: &HashMap) { + self.scan_request.lock().unwrap().json2_column_types = json2_column_types.clone(); + } + pub fn with_sequence(&self, sequence: u64) { self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence); } diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index 4259b587ba..9a3a46d066 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -14,6 +14,7 @@ pub mod constant_term; pub mod count_wildcard; +pub mod json2_scan_hint; pub mod parallelize_scan; pub mod pass_distribution; pub mod remove_duplicate; diff --git a/src/query/src/optimizer/json2_scan_hint.rs b/src/query/src/optimizer/json2_scan_hint.rs new file mode 100644 index 0000000000..c8f920c6d9 --- /dev/null +++ b/src/query/src/optimizer/json2_scan_hint.rs @@ -0,0 +1,237 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use common_function::scalars::json::json2_get::Json2GetFunction; +use datafusion::datasource::DefaultTableSource; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{Result, ScalarValue, TableReference}; +use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::{Expr, LogicalPlan}; +use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; +use datatypes::data_type::ConcreteDataType; +use datatypes::json::requirement::JsonPathTarget; +use datatypes::types::JsonFormat; + +use crate::dummy_catalog::DummyTableProvider; + +#[derive(Debug)] +pub struct Json2ScanHintRule; + +impl OptimizerRule for Json2ScanHintRule { + fn name(&self) -> &str { + "Json2ScanHintRule" + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + let collector = Json2RequirementCollector::collect(&plan)?; + if collector.is_empty() { + return Ok(Transformed::no(plan)); + } + + plan.transform_down(&mut |plan| match &plan { + LogicalPlan::TableScan(table_scan) => { + let Some(source) = table_scan + .source + .as_any() + .downcast_ref::() + else { + return Ok(Transformed::no(plan)); + }; + + let Some(adapter) = source + .table_provider + .as_any() + .downcast_ref::() + else { + return Ok(Transformed::no(plan)); + }; + + let hints = collector.deduce_required_json2_type( + &table_scan.table_name, + &adapter.region_metadata().schema, + ); + adapter.with_json2_type_hint(&hints); + Ok(Transformed::yes(plan)) + } + _ => Ok(Transformed::no(plan)), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct Json2ColumnKey { + relation: Option, + name: String, +} + +#[derive(Debug, Default)] +struct Json2RequirementCollector { + path_targets: HashMap, + full_columns: HashSet, +} + +impl Json2RequirementCollector { + fn collect(plan: &LogicalPlan) -> Result { + let mut collector = Self::default(); + plan.apply(|node| { + for expr in node.expressions() { + let _ = expr.apply(|expr| { + if let Some((column, path)) = extract_json2_get(expr) { + if !collector.matches_full_column(&column) { + collector + .path_targets + .entry(column) + .or_default() + .require_path(&path); + } + return Ok(TreeNodeRecursion::Stop); + } + + if let Expr::Column(column) = expr { + let key = Json2ColumnKey { + relation: column.relation.clone(), + name: column.name.clone(), + }; + collector.path_targets.remove(&key); + collector.full_columns.insert(key); + } + Ok(TreeNodeRecursion::Continue) + })?; + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(collector) + } + + fn is_empty(&self) -> bool { + self.path_targets.is_empty() && self.full_columns.is_empty() + } + + fn matches_full_column(&self, column: &Json2ColumnKey) -> bool { + self.full_columns.contains(column) + || self.full_columns.contains(&Json2ColumnKey { + relation: None, + name: column.name.clone(), + }) + } + + fn deduce_required_json2_type( + &self, + table_name: &TableReference, + schema: &datatypes::schema::SchemaRef, + ) -> HashMap { + let mut types = HashMap::new(); + + for column_schema in schema.column_schemas() { + let ConcreteDataType::Json(json_type) = &column_schema.data_type else { + continue; + }; + if !matches!(json_type.format, JsonFormat::Json2) { + continue; + } + + let matching_keys = self + .path_targets + .iter() + .filter(|(key, _)| { + key.name == column_schema.name + && key.relation.as_ref().is_none_or(|x| x == table_name) + }) + .map(|(_, target)| target.clone()) + .collect::>(); + if matching_keys.is_empty() { + continue; + } + + let has_full_column = self.full_columns.iter().any(|key| { + key.name == column_schema.name + && key.relation.as_ref().is_none_or(|x| x == table_name) + }); + if has_full_column { + continue; + } + + let mut merged = JsonPathTarget::default(); + for target in matching_keys { + if let Some(data_type) = target.build_type() { + merge_path_target_from_type(&mut merged, &data_type, ""); + } + } + if let Some(data_type) = merged.build_type() { + let _ = types.insert(column_schema.name.clone(), data_type); + } + } + + types + } +} + +fn extract_json2_get(expr: &Expr) -> Option<(Json2ColumnKey, String)> { + let Expr::ScalarFunction(ScalarFunction { func, args }) = expr else { + return None; + }; + if func.name() != Json2GetFunction::NAME || args.len() != 2 { + return None; + } + + let Expr::Column(column) = &args[0] else { + return None; + }; + + let path = match &args[1] { + Expr::Literal(ScalarValue::Utf8(Some(path)), _) + | Expr::Literal(ScalarValue::LargeUtf8(Some(path)), _) + | Expr::Literal(ScalarValue::Utf8View(Some(path)), _) => path.clone(), + _ => return None, + }; + + Some(( + Json2ColumnKey { + relation: column.relation.clone(), + name: column.name.clone(), + }, + path, + )) +} + +fn merge_path_target_from_type( + target: &mut JsonPathTarget, + data_type: &ConcreteDataType, + prefix: &str, +) { + match data_type { + ConcreteDataType::Struct(struct_type) => { + let fields = struct_type.fields(); + for field in fields.iter() { + let path = if prefix.is_empty() { + field.name().to_string() + } else { + format!("{prefix}.{}", field.name()) + }; + merge_path_target_from_type(target, field.data_type(), &path); + } + } + _ => { + if !prefix.is_empty() { + target.require_path(prefix); + } + } + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index a45fc4c896..7877ff2f2c 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -61,6 +61,7 @@ use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_B use crate::optimizer::ExtensionAnalyzerRule; use crate::optimizer::constant_term::MatchesConstantTermOptimizer; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; +use crate::optimizer::json2_scan_hint::Json2ScanHintRule; use crate::optimizer::parallelize_scan::ParallelizeScan; use crate::optimizer::pass_distribution::PassDistribution; use crate::optimizer::remove_duplicate::RemoveDuplicate; @@ -171,6 +172,7 @@ impl QueryEngineState { analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer)); let mut optimizer = Optimizer::new(); + optimizer.rules.push(Arc::new(Json2ScanHintRule)); optimizer.rules.push(Arc::new(ScanHintRule)); // add physical optimizer diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 6725de92e3..55c8b48a99 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::{Display, Formatter}; use common_error::ext::BoxedError; use common_recordbatch::OrderOption; use datafusion_expr::expr::Expr; // Re-export vector types from datatypes to avoid duplication +use datatypes::data_type::ConcreteDataType; pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType}; use strum::Display; @@ -126,6 +128,8 @@ pub struct ScanRequest { pub vector_search: Option, /// Whether to force reading region data in flat format. pub force_flat_format: bool, + /// Optional target types for query-driven JSON2 concretization. + pub json2_column_types: HashMap, } impl Display for ScanRequest { @@ -216,6 +220,14 @@ impl Display for ScanRequest { self.force_flat_format )?; } + if !self.json2_column_types.is_empty() { + write!( + f, + "{}json2_column_types: {:?}", + delimiter.as_str(), + self.json2_column_types + )?; + } write!(f, " }}") } } diff --git a/tests/cases/standalone/common/types/json/json2.result b/tests/cases/standalone/common/types/json/json2.result index f0644cbcab..2f4c5d7435 100644 --- a/tests/cases/standalone/common/types/json/json2.result +++ b/tests/cases/standalone/common/types/json/json2.result @@ -123,6 +123,40 @@ select j.y from json2_table order by ts; | false | +------------------------------------+ +select j.c, j.a.b from json2_table order by ts; + ++------------------------------------+--------------------------------------+ +| json2_get(json2_table.j,Utf8("c")) | json2_get(json2_table.j,Utf8("a.b")) | ++------------------------------------+--------------------------------------+ +| s1 | 1 | +| s2 | 2 | +| s3 | 3 | +| | 4 | +| s5 | | +| s6 | | +| [1] | s7 | +| s8 | 8 | +| s9 | | +| | 10 | ++------------------------------------+--------------------------------------+ + +select j.a.b, j.a from json2_table order by ts; + ++--------------------------------------+------------------------------------+ +| json2_get(json2_table.j,Utf8("a.b")) | json2_get(json2_table.j,Utf8("a")) | ++--------------------------------------+------------------------------------+ +| 1 | {b: 1} | +| 2 | {b: 2} | +| 3 | {b: 3} | +| 4 | {b: 4} | +| | {b: } | +| | | +| s7 | {b: s7} | +| 8 | {b: 8} | +| | {b: } | +| 10 | {b: 10} | ++--------------------------------------+------------------------------------+ + drop table json2_table; Affected Rows: 0 diff --git a/tests/cases/standalone/common/types/json/json2.sql b/tests/cases/standalone/common/types/json/json2.sql index a6ea4eb32d..68be681f4f 100644 --- a/tests/cases/standalone/common/types/json/json2.sql +++ b/tests/cases/standalone/common/types/json/json2.sql @@ -39,4 +39,8 @@ select j.c from json2_table order by ts; select j.y from json2_table order by ts; +select j.c, j.a.b from json2_table order by ts; + +select j.a.b, j.a from json2_table order by ts; + drop table json2_table;