query driven concretize json2 datatype

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-03-11 19:54:24 +08:00
parent caac2f1ac8
commit 016d5df963
11 changed files with 450 additions and 29 deletions

View File

@@ -19,6 +19,7 @@
//! The struct will carry all the fields of the Json object. We will not flatten any json object in this implementation.
//!
pub mod requirement;
pub mod value;
use std::collections::{BTreeMap, HashSet};

View File

@@ -0,0 +1,71 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::Arc;
use crate::data_type::ConcreteDataType;
use crate::types::{StructField, StructType};
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct JsonPathTarget {
root: JsonPathTargetNode,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct JsonPathTargetNode {
children: BTreeMap<String, JsonPathTargetNode>,
string_leaf: bool,
}
impl JsonPathTarget {
pub fn require_path(&mut self, path: &str) {
let mut current = &mut self.root;
for segment in path.split('.') {
current = current.children.entry(segment.to_string()).or_default();
}
current.string_leaf = true;
}
pub fn is_empty(&self) -> bool {
self.root.children.is_empty() && !self.root.string_leaf
}
pub fn build_type(&self) -> Option<ConcreteDataType> {
if self.is_empty() {
None
} else {
Some(ConcreteDataType::Struct(self.root.build_struct_type()))
}
}
}
impl JsonPathTargetNode {
fn build_data_type(&self) -> ConcreteDataType {
if self.children.is_empty() {
ConcreteDataType::string_datatype()
} else {
ConcreteDataType::Struct(self.build_struct_type())
}
}
fn build_struct_type(&self) -> StructType {
let fields = self
.children
.iter()
.map(|(name, child)| StructField::new(name.clone(), child.build_data_type(), true))
.collect::<Vec<_>>();
StructType::new(Arc::new(fields))
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::sync::Arc;
use arrow::compute;
@@ -19,7 +20,7 @@ use arrow::util::display::{ArrayFormatter, FormatOptions};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, StructArray, new_null_array};
use arrow_schema::DataType;
use snafu::{ResultExt, ensure};
use snafu::ResultExt;
use crate::arrow_array::StringArray;
use crate::error::{AlignJsonArraySnafu, ArrowComputeSnafu, Result};
@@ -64,43 +65,37 @@ impl JsonArray<'_> {
while i < expect_fields.len() && j < array_fields.len() {
let expect_field = &expect_fields[i];
let array_field = &array_fields[j];
if expect_field.name() == array_field.name() {
if expect_field.data_type() == array_field.data_type() {
aligned.push(array_columns[j].clone());
} else {
let array = JsonArray::from(&array_columns[j]);
if matches!(expect_field.data_type(), DataType::Struct(_)) {
// A `StructArray` in a JSON array must be another JSON array.
// (Like a nested JSON object in a JSON value.)
aligned.push(array.try_align(expect_field.data_type())?);
match expect_field.name().cmp(array_field.name()) {
Ordering::Equal => {
if expect_field.data_type() == array_field.data_type() {
aligned.push(array_columns[j].clone());
} else {
aligned.push(array.try_cast(expect_field.data_type())?);
let array = JsonArray::from(&array_columns[j]);
if matches!(expect_field.data_type(), DataType::Struct(_)) {
// A `StructArray` in a JSON array must be another JSON array.
// (Like a nested JSON object in a JSON value.)
aligned.push(array.try_align(expect_field.data_type())?);
} else {
aligned.push(array.try_cast(expect_field.data_type())?);
}
}
i += 1;
j += 1;
}
Ordering::Less => {
aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
i += 1;
}
Ordering::Greater => {
j += 1;
}
j += 1;
} else {
aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
}
i += 1;
}
if i < expect_fields.len() {
for field in &expect_fields[i..] {
aligned.push(new_null_array(field.data_type(), struct_array.len()));
}
}
ensure!(
j == array_fields.len(),
AlignJsonArraySnafu {
reason: format!(
"this json array has more fields {:?}",
array_fields[j..]
.iter()
.map(|x| x.name())
.collect::<Vec<_>>(),
)
}
);
let json_array = StructArray::try_new(
expect_fields.clone(),

View File

@@ -15,7 +15,7 @@
//! Scans a region according to the scan request.
use std::borrow::Cow;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::num::NonZeroU64;
use std::sync::Arc;
@@ -556,6 +556,7 @@ impl ScanRegion {
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution)
.with_json2_column_types(self.request.json2_column_types.clone())
.with_flat_format(flat_format);
#[cfg(feature = "vector_index")]
let input = input
@@ -801,6 +802,51 @@ impl ScanRegion {
}
fn maybe_concretize_flat_json2_schema(input: ScanInput) -> ScanInput {
apply_query_driven_json2_schema(input)
// apply_data_driven_json2_schema(input)
}
fn apply_query_driven_json2_schema(input: ScanInput) -> ScanInput {
let Some(flat_mapper) = input.mapper.as_flat() else {
return input;
};
if input.json2_column_types.is_empty() {
return input;
}
let output_schema = flat_mapper.output_schema();
let output_arrow_schema = output_schema.arrow_schema();
let mut column_schemas = output_schema.column_schemas().to_vec();
let mut changed = false;
for (idx, column_schema) in column_schemas.iter_mut().enumerate() {
let output_field = &output_arrow_schema.fields()[idx];
if !is_json_extension_type(output_field) {
continue;
}
let Some(target_type) = input.json2_column_types.get(&column_schema.name) else {
continue;
};
if &column_schema.data_type != target_type {
column_schema.data_type = target_type.clone();
changed = true;
}
}
if !changed {
return input;
}
let mut mapper = Arc::unwrap_or_clone(input.mapper);
mapper.with_flat_output_schema(Arc::new(Schema::new(column_schemas)));
ScanInput {
mapper: Arc::new(mapper),
..input
}
}
#[expect(unused)]
fn apply_data_driven_json2_schema(input: ScanInput) -> ScanInput {
let Some(flat_mapper) = input.mapper.as_flat() else {
return input;
};
@@ -914,6 +960,8 @@ pub struct ScanInput {
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
/// Hint for the required distribution of the scanner.
pub(crate) distribution: Option<TimeSeriesDistribution>,
/// Query-driven target types for JSON2 columns.
json2_column_types: HashMap<String, ConcreteDataType>,
/// Whether to use flat format.
pub(crate) flat_format: bool,
/// Whether this scan is for compaction.
@@ -952,6 +1000,7 @@ impl ScanInput {
merge_mode: MergeMode::default(),
series_row_selector: None,
distribution: None,
json2_column_types: HashMap::new(),
flat_format: false,
compaction: false,
#[cfg(feature = "enterprise")]
@@ -988,6 +1037,15 @@ impl ScanInput {
self
}
#[must_use]
fn with_json2_column_types(
mut self,
json2_column_types: HashMap<String, ConcreteDataType>,
) -> Self {
self.json2_column_types = json2_column_types;
self
}
/// Sets cache for this query.
#[must_use]
pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {

View File

@@ -15,6 +15,7 @@
//! Dummy catalog for region server.
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
@@ -30,6 +31,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::data_type::ConcreteDataType;
use futures::stream::BoxStream;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
@@ -266,6 +268,10 @@ impl DummyTableProvider {
self.scan_request.lock().unwrap().vector_search.clone()
}
pub fn with_json2_type_hint(&self, json2_column_types: &HashMap<String, ConcreteDataType>) {
self.scan_request.lock().unwrap().json2_column_types = json2_column_types.clone();
}
pub fn with_sequence(&self, sequence: u64) {
self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
}

View File

@@ -14,6 +14,7 @@
pub mod constant_term;
pub mod count_wildcard;
pub mod json2_scan_hint;
pub mod parallelize_scan;
pub mod pass_distribution;
pub mod remove_duplicate;

View File

@@ -0,0 +1,237 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use common_function::scalars::json::json2_get::Json2GetFunction;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{Result, ScalarValue, TableReference};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{Expr, LogicalPlan};
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use datatypes::data_type::ConcreteDataType;
use datatypes::json::requirement::JsonPathTarget;
use datatypes::types::JsonFormat;
use crate::dummy_catalog::DummyTableProvider;
#[derive(Debug)]
pub struct Json2ScanHintRule;
impl OptimizerRule for Json2ScanHintRule {
fn name(&self) -> &str {
"Json2ScanHintRule"
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let collector = Json2RequirementCollector::collect(&plan)?;
if collector.is_empty() {
return Ok(Transformed::no(plan));
}
plan.transform_down(&mut |plan| match &plan {
LogicalPlan::TableScan(table_scan) => {
let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
else {
return Ok(Transformed::no(plan));
};
let Some(adapter) = source
.table_provider
.as_any()
.downcast_ref::<DummyTableProvider>()
else {
return Ok(Transformed::no(plan));
};
let hints = collector.deduce_required_json2_type(
&table_scan.table_name,
&adapter.region_metadata().schema,
);
adapter.with_json2_type_hint(&hints);
Ok(Transformed::yes(plan))
}
_ => Ok(Transformed::no(plan)),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct Json2ColumnKey {
relation: Option<TableReference>,
name: String,
}
#[derive(Debug, Default)]
struct Json2RequirementCollector {
path_targets: HashMap<Json2ColumnKey, JsonPathTarget>,
full_columns: HashSet<Json2ColumnKey>,
}
impl Json2RequirementCollector {
fn collect(plan: &LogicalPlan) -> Result<Self> {
let mut collector = Self::default();
plan.apply(|node| {
for expr in node.expressions() {
let _ = expr.apply(|expr| {
if let Some((column, path)) = extract_json2_get(expr) {
if !collector.matches_full_column(&column) {
collector
.path_targets
.entry(column)
.or_default()
.require_path(&path);
}
return Ok(TreeNodeRecursion::Stop);
}
if let Expr::Column(column) = expr {
let key = Json2ColumnKey {
relation: column.relation.clone(),
name: column.name.clone(),
};
collector.path_targets.remove(&key);
collector.full_columns.insert(key);
}
Ok(TreeNodeRecursion::Continue)
})?;
}
Ok(TreeNodeRecursion::Continue)
})?;
Ok(collector)
}
fn is_empty(&self) -> bool {
self.path_targets.is_empty() && self.full_columns.is_empty()
}
fn matches_full_column(&self, column: &Json2ColumnKey) -> bool {
self.full_columns.contains(column)
|| self.full_columns.contains(&Json2ColumnKey {
relation: None,
name: column.name.clone(),
})
}
fn deduce_required_json2_type(
&self,
table_name: &TableReference,
schema: &datatypes::schema::SchemaRef,
) -> HashMap<String, ConcreteDataType> {
let mut types = HashMap::new();
for column_schema in schema.column_schemas() {
let ConcreteDataType::Json(json_type) = &column_schema.data_type else {
continue;
};
if !matches!(json_type.format, JsonFormat::Json2) {
continue;
}
let matching_keys = self
.path_targets
.iter()
.filter(|(key, _)| {
key.name == column_schema.name
&& key.relation.as_ref().is_none_or(|x| x == table_name)
})
.map(|(_, target)| target.clone())
.collect::<Vec<_>>();
if matching_keys.is_empty() {
continue;
}
let has_full_column = self.full_columns.iter().any(|key| {
key.name == column_schema.name
&& key.relation.as_ref().is_none_or(|x| x == table_name)
});
if has_full_column {
continue;
}
let mut merged = JsonPathTarget::default();
for target in matching_keys {
if let Some(data_type) = target.build_type() {
merge_path_target_from_type(&mut merged, &data_type, "");
}
}
if let Some(data_type) = merged.build_type() {
let _ = types.insert(column_schema.name.clone(), data_type);
}
}
types
}
}
fn extract_json2_get(expr: &Expr) -> Option<(Json2ColumnKey, String)> {
let Expr::ScalarFunction(ScalarFunction { func, args }) = expr else {
return None;
};
if func.name() != Json2GetFunction::NAME || args.len() != 2 {
return None;
}
let Expr::Column(column) = &args[0] else {
return None;
};
let path = match &args[1] {
Expr::Literal(ScalarValue::Utf8(Some(path)), _)
| Expr::Literal(ScalarValue::LargeUtf8(Some(path)), _)
| Expr::Literal(ScalarValue::Utf8View(Some(path)), _) => path.clone(),
_ => return None,
};
Some((
Json2ColumnKey {
relation: column.relation.clone(),
name: column.name.clone(),
},
path,
))
}
fn merge_path_target_from_type(
target: &mut JsonPathTarget,
data_type: &ConcreteDataType,
prefix: &str,
) {
match data_type {
ConcreteDataType::Struct(struct_type) => {
let fields = struct_type.fields();
for field in fields.iter() {
let path = if prefix.is_empty() {
field.name().to_string()
} else {
format!("{prefix}.{}", field.name())
};
merge_path_target_from_type(target, field.data_type(), &path);
}
}
_ => {
if !prefix.is_empty() {
target.require_path(prefix);
}
}
}
}

View File

@@ -61,6 +61,7 @@ use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_B
use crate::optimizer::ExtensionAnalyzerRule;
use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use crate::optimizer::json2_scan_hint::Json2ScanHintRule;
use crate::optimizer::parallelize_scan::ParallelizeScan;
use crate::optimizer::pass_distribution::PassDistribution;
use crate::optimizer::remove_duplicate::RemoveDuplicate;
@@ -171,6 +172,7 @@ impl QueryEngineState {
analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
let mut optimizer = Optimizer::new();
optimizer.rules.push(Arc::new(Json2ScanHintRule));
optimizer.rules.push(Arc::new(ScanHintRule));
// add physical optimizer

View File

@@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use common_error::ext::BoxedError;
use common_recordbatch::OrderOption;
use datafusion_expr::expr::Expr;
// Re-export vector types from datatypes to avoid duplication
use datatypes::data_type::ConcreteDataType;
pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
use strum::Display;
@@ -126,6 +128,8 @@ pub struct ScanRequest {
pub vector_search: Option<VectorSearchRequest>,
/// Whether to force reading region data in flat format.
pub force_flat_format: bool,
/// Optional target types for query-driven JSON2 concretization.
pub json2_column_types: HashMap<String, ConcreteDataType>,
}
impl Display for ScanRequest {
@@ -216,6 +220,14 @@ impl Display for ScanRequest {
self.force_flat_format
)?;
}
if !self.json2_column_types.is_empty() {
write!(
f,
"{}json2_column_types: {:?}",
delimiter.as_str(),
self.json2_column_types
)?;
}
write!(f, " }}")
}
}

View File

@@ -123,6 +123,40 @@ select j.y from json2_table order by ts;
| false |
+------------------------------------+
select j.c, j.a.b from json2_table order by ts;
+------------------------------------+--------------------------------------+
| json2_get(json2_table.j,Utf8("c")) | json2_get(json2_table.j,Utf8("a.b")) |
+------------------------------------+--------------------------------------+
| s1 | 1 |
| s2 | 2 |
| s3 | 3 |
| | 4 |
| s5 | |
| s6 | |
| [1] | s7 |
| s8 | 8 |
| s9 | |
| | 10 |
+------------------------------------+--------------------------------------+
select j.a.b, j.a from json2_table order by ts;
+--------------------------------------+------------------------------------+
| json2_get(json2_table.j,Utf8("a.b")) | json2_get(json2_table.j,Utf8("a")) |
+--------------------------------------+------------------------------------+
| 1 | {b: 1} |
| 2 | {b: 2} |
| 3 | {b: 3} |
| 4 | {b: 4} |
| | {b: } |
| | |
| s7 | {b: s7} |
| 8 | {b: 8} |
| | {b: } |
| 10 | {b: 10} |
+--------------------------------------+------------------------------------+
drop table json2_table;
Affected Rows: 0

View File

@@ -39,4 +39,8 @@ select j.c from json2_table order by ts;
select j.y from json2_table order by ts;
select j.c, j.a.b from json2_table order by ts;
select j.a.b, j.a from json2_table order by ts;
drop table json2_table;