feat: concretize json type from query

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-08 15:07:55 +08:00
parent b5997c6797
commit ee135caeb9
16 changed files with 578 additions and 109 deletions

1
Cargo.lock generated
View File

@@ -8168,6 +8168,7 @@ version = "1.0.0"
dependencies = [
"api",
"aquamarine",
"arrow-schema 57.3.0",
"async-channel 1.9.0",
"async-stream",
"async-trait",

View File

@@ -63,25 +63,18 @@ trait JsonGetResultBuilder {
fn build(&mut self) -> ArrayRef;
}
fn result_builder(
len: usize,
with_type: Option<&DataType>,
) -> Result<Box<dyn JsonGetResultBuilder>> {
let builder = if let Some(t) = with_type {
match t {
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
as Box<dyn JsonGetResultBuilder>
}
DataType::Int64 => Box::new(IntResultBuilder(Int64Builder::with_capacity(len))),
DataType::Float64 => Box::new(FloatResultBuilder(Float64Builder::with_capacity(len))),
DataType::Boolean => Box::new(BoolResultBuilder(BooleanBuilder::with_capacity(len))),
t => {
return exec_err!("json_get with unknown type {t}");
}
fn result_builder(len: usize, with_type: &DataType) -> Result<Box<dyn JsonGetResultBuilder>> {
let builder = match with_type {
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
as Box<dyn JsonGetResultBuilder>
}
DataType::Int64 => Box::new(IntResultBuilder(Int64Builder::with_capacity(len))),
DataType::Float64 => Box::new(FloatResultBuilder(Float64Builder::with_capacity(len))),
DataType::Boolean => Box::new(BoolResultBuilder(BooleanBuilder::with_capacity(len))),
t => {
return exec_err!("json_get with unknown type {t}");
}
} else {
Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
};
Ok(builder)
}
@@ -339,7 +332,7 @@ fn jsonb_get(
Ok(())
}
fn json_struct_get(array: &ArrayRef, path: &str, with_type: Option<&DataType>) -> Result<ArrayRef> {
fn json_struct_get(array: &ArrayRef, path: &str, with_type: &DataType) -> Result<ArrayRef> {
let path = path.trim_start_matches("$");
// Fast path: if the JSON array fields can be directly indexed into by the `path`, simply get
@@ -356,20 +349,13 @@ fn json_struct_get(array: &ArrayRef, path: &str, with_type: Option<&DataType>) -
return exec_err!("unknown JSON array datatype: {}", current.data_type());
};
let Some(sub_json) = json.column_by_name(segment) else {
return Ok(new_null_array(
with_type.unwrap_or(&DataType::Utf8View),
array.len(),
));
return Ok(new_null_array(with_type, array.len()));
};
current = sub_json;
}
// Build the result array with optional value mapper.
fn build_with<F>(
input: &ArrayRef,
with_type: Option<&DataType>,
value_mapper: F,
) -> Result<ArrayRef>
fn build_with<F>(input: &ArrayRef, with_type: &DataType, value_mapper: F) -> Result<ArrayRef>
where
for<'a> F: Fn(&'a Value) -> Option<&'a Value>,
{
@@ -397,20 +383,18 @@ fn json_struct_get(array: &ArrayRef, path: &str, with_type: Option<&DataType>) -
}
if direct {
let casted = if let Some(with_type) = with_type
&& current.data_type() != with_type
{
let casted = if current.data_type() != with_type {
match (current.data_type(), with_type) {
(DataType::Binary, _) => {
// Fall back to the slow path if the found JSON sub-array is serialized to bytes
// (because of JSON type conflicting)
build_with(current, Some(with_type), |v| Some(v))?
build_with(current, with_type, |v| Some(v))?
}
(DataType::List(_) | DataType::Struct(_), with_type) if with_type.is_string() => {
// Special handle for wanted array is string (Arrow cast is not working here if
// the datatype is list or struct), because it could be used in displaying the
// result.
build_with(current, Some(with_type), |v| Some(v))?
build_with(current, with_type, |v| Some(v))?
}
(_, with_type) if with_type.is_string() => {
// Same special handle for wanted array is string as above, except for simply
@@ -510,17 +494,22 @@ impl Function for JsonGetWithType {
);
};
let with_type = args.args.get(2).map(|x| x.data_type());
let with_type = args
.args
.get(2)
.map(|x| x.data_type())
.unwrap_or(DataType::Utf8View);
let result = match arg0.data_type() {
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
let mut builder = result_builder(len, with_type.as_ref())?;
let mut builder = result_builder(len, &with_type)?;
jsonb_get(jsons, path, builder.as_mut())?;
builder.build()
}
DataType::Struct(_) => json_struct_get(&arg0, path, with_type.as_ref())?,
DataType::Struct(_) => json_struct_get(&arg0, path, &with_type)?,
_ => {
return exec_err!("JSON_GET not supported argument type {}", arg0.data_type());
}

View File

@@ -23,9 +23,11 @@ use regex::{Captures, Regex};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::Error;
use crate::data_type::DataType;
use crate::error::{
DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, MergeJsonDatatypeSnafu, Result,
UnsupportedArrowTypeSnafu,
};
use crate::prelude::ConcreteDataType;
use crate::scalars::ScalarVectorBuilder;
@@ -49,8 +51,9 @@ pub enum JsonNumberType {
F64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default)]
pub enum JsonNativeType {
#[default]
Null,
Bool,
Number(JsonNumberType),
@@ -79,6 +82,41 @@ impl JsonNativeType {
pub fn f64() -> Self {
Self::Number(JsonNumberType::F64)
}
/// Merge other [JsonNativeType] into this.
/// Conflicting fields will be resolved to the "Variant" type.
pub fn merge(&mut self, other: &JsonNativeType) {
if self == other {
return;
}
fn merge_object(this: &mut JsonObjectType, that: &JsonObjectType) {
// merge "that" into "this" directly:
for (type_name, that_type) in that {
if let Some(this_type) = this.get_mut(type_name) {
this_type.merge(that_type);
} else {
this.insert(type_name.clone(), that_type.clone());
}
}
}
let zelf = std::mem::take(self);
*self = match (zelf, other) {
(JsonNativeType::Object(mut this), JsonNativeType::Object(that)) => {
merge_object(&mut this, that);
JsonNativeType::Object(this)
}
(JsonNativeType::Array(mut this), JsonNativeType::Array(that)) => {
this.merge(that);
JsonNativeType::Array(this)
}
(JsonNativeType::Null, that) => that.clone(),
(this, JsonNativeType::Null) => this,
(this, that) if this == *that => this,
_ => JsonNativeType::Variant,
};
}
}
impl From<&JsonNativeType> for ConcreteDataType {
@@ -139,6 +177,56 @@ impl From<&ConcreteDataType> for JsonNativeType {
}
}
impl TryFrom<&ArrowDataType> for JsonNativeType {
type Error = Error;
fn try_from(t: &ArrowDataType) -> Result<Self> {
let t = match t {
ArrowDataType::Null => JsonNativeType::Null,
ArrowDataType::Boolean => JsonNativeType::Bool,
ArrowDataType::Int8
| ArrowDataType::Int16
| ArrowDataType::Int32
| ArrowDataType::Int64 => JsonNativeType::i64(),
ArrowDataType::UInt8
| ArrowDataType::UInt16
| ArrowDataType::UInt32
| ArrowDataType::UInt64 => JsonNativeType::u64(),
ArrowDataType::Float16 | ArrowDataType::Float32 | ArrowDataType::Float64 => {
JsonNativeType::f64()
}
ArrowDataType::Binary
| ArrowDataType::FixedSizeBinary(_)
| ArrowDataType::LargeBinary
| ArrowDataType::BinaryView => JsonNativeType::Variant,
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => {
JsonNativeType::String
}
ArrowDataType::List(field)
| ArrowDataType::ListView(field)
| ArrowDataType::FixedSizeList(field, _)
| ArrowDataType::LargeList(field)
| ArrowDataType::LargeListView(field) => {
JsonNativeType::Array(Box::new(Self::try_from(field.data_type())?))
}
ArrowDataType::Struct(fields) => {
let mut object = JsonObjectType::new();
for field in fields {
object.insert(field.name().clone(), Self::try_from(field.data_type())?);
}
JsonNativeType::Object(object)
}
t => {
return UnsupportedArrowTypeSnafu {
arrow_type: t.clone(),
}
.fail();
}
};
Ok(t)
}
}
impl Display for JsonNativeType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
@@ -229,11 +317,10 @@ impl JsonType {
return Ok(());
}
match (&self.format, &other.format) {
match (&mut self.format, &other.format) {
(JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()),
(JsonFormat::Json2(this), JsonFormat::Json2(that)) => {
let merged = merge(this.as_ref(), that.as_ref());
self.format = JsonFormat::Json2(Box::new(merged));
this.merge(that);
Ok(())
}
_ => MergeJsonDatatypeSnafu {
@@ -289,34 +376,6 @@ pub(crate) fn plain_json_struct_type(item_type: ConcreteDataType) -> StructType
StructType::new(Arc::new(vec![field]))
}
fn merge(this: &JsonNativeType, that: &JsonNativeType) -> JsonNativeType {
fn merge_object(this: &JsonObjectType, that: &JsonObjectType) -> JsonObjectType {
let mut this = this.clone();
// merge "that" into "this" directly:
for (type_name, that_type) in that {
if let Some(this_type) = this.get_mut(type_name) {
let merged_type = merge(this_type, that_type);
*this_type = merged_type;
} else {
this.insert(type_name.clone(), that_type.clone());
}
}
this
}
match (this, that) {
(this, that) if this == that => this.clone(),
(JsonNativeType::Array(this), JsonNativeType::Array(that)) => {
JsonNativeType::Array(Box::new(merge(this.as_ref(), that.as_ref())))
}
(JsonNativeType::Object(this), JsonNativeType::Object(that)) => {
JsonNativeType::Object(merge_object(this, that))
}
(JsonNativeType::Null, x) | (x, JsonNativeType::Null) => x.clone(),
_ => JsonNativeType::Variant,
}
}
impl From<&ArrowDataType> for JsonType {
fn from(t: &ArrowDataType) -> Self {
JsonType::new_json2(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t)))

View File

@@ -22,7 +22,7 @@ use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
use arrow_schema::{DataType, FieldRef};
use serde_json::Value;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ResultExt};
use crate::arrow_array::{StringArray, binary_array_value, string_array_value};
use crate::error::{
@@ -90,14 +90,12 @@ impl JsonArray<'_> {
Ok(value)
}
/// Align a JSON array to the `expect` data type. The `expect` data type is often the "largest"
/// JSON type after some insertions in the table schema, while the JSON array previously
/// written in the SST could be lagged behind it. So it's important to "align" the JSON array by
/// setting the missing fields with null arrays, or casting the data.
/// Align a JSON array to the `expect` data type. The alignment mostly does three things:
///
/// It's an error if the to-be-aligned array contains extra fields that are not in the `expect`
/// data type. Forcing to align that kind of array will result in data loss, something we
/// generally not wanted.
/// 1. set the missing fields with null arrays;
/// 2. discard the fields that are not in the `expect` data type;
/// 3. cast the fields to the ones with same names in the `expect` if their data types are not
/// matched.
pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
if self.inner.data_type() == expect {
return Ok(self.inner.clone());
@@ -155,10 +153,7 @@ impl JsonArray<'_> {
i += 1;
}
Ordering::Greater => {
return AlignJsonArraySnafu {
reason: format!("extra fields are found: [{}]", array_field.name()),
}
.fail();
j += 1;
}
}
}
@@ -167,19 +162,6 @@ impl JsonArray<'_> {
aligned.push(new_null_array(field.data_type(), struct_array.len()));
}
}
ensure!(
j >= array_fields.len(),
AlignJsonArraySnafu {
reason: format!(
"extra fields are found: [{}]",
array_fields[j..]
.iter()
.map(|x| x.name().as_str())
.collect::<Vec<_>>()
.join(", ")
),
}
);
let json_array = StructArray::try_new(
expect_fields.clone(),
@@ -475,17 +457,6 @@ mod test {
)
.test()?;
// Test align failed.
TestCase::new(
StructArray::try_from(vec![
("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
])
.unwrap(),
Fields::from(vec![Field::new("i", DataType::Int64, true)]),
Err("Failed to align JSON array, reason: extra fields are found: [j]".to_string()),
)
.test()?;
Ok(())
}
}

View File

@@ -16,6 +16,7 @@ workspace = true
[dependencies]
api.workspace = true
aquamarine.workspace = true
arrow-schema.workspace = true
async-channel = "1.9"
common-stat.workspace = true
async-stream.workspace = true

View File

@@ -18,14 +18,18 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu};
use common_recordbatch::error::{
ArrowComputeSnafu, DataTypesSnafu, ExternalSnafu, NewDfRecordBatchSnafu,
};
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::Helper;
use datatypes::vectors::json::array::JsonArray;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
@@ -238,6 +242,10 @@ impl FlatProjectionMapper {
self.output_schema.clone()
}
pub(crate) fn with_output_schema(&mut self, schema: SchemaRef) {
self.output_schema = schema;
}
/// Converts a flat format [RecordBatch] to a normal [RecordBatch].
///
/// The batch must match the `projection` using to build the mapper.
@@ -283,6 +291,14 @@ impl FlatProjectionMapper {
array = casted;
}
}
let field = &self.output_schema.arrow_schema().fields()[output_idx];
if is_json_extension_type(field) {
array = JsonArray::from(&array)
.try_align(field.data_type())
.context(DataTypesSnafu)?;
}
arrays.push(array);
}

View File

@@ -14,7 +14,7 @@
//! Scans a region according to the scan request.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::num::NonZeroU64;
use std::sync::Arc;
@@ -31,6 +31,11 @@ use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
use datafusion_common::Column;
use datafusion_expr::Expr;
use datafusion_expr::utils::expr_to_columns;
use datatypes::data_type::ConcreteDataType;
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::Schema;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type::JsonNativeType;
use futures::StreamExt;
use partition::expr::PartitionExpr;
use smallvec::SmallVec;
@@ -424,7 +429,7 @@ impl ScanRegion {
let read_col_ids = read_cols.column_ids();
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match self.request.projection_indices() {
let mut mapper = match self.request.projection_indices() {
Some(p) => FlatProjectionMapper::new_with_read_columns(
&self.version.metadata,
p.to_vec(),
@@ -432,6 +437,7 @@ impl ScanRegion {
)?,
None => FlatProjectionMapper::all(&self.version.metadata)?,
};
concretize_json_types(&mut mapper, &self.request.json_type_hint);
let ssts = &self.version.ssts;
let mut files = Vec::new();
@@ -727,6 +733,34 @@ impl ScanRegion {
}
}
fn concretize_json_types(
mapper: &mut FlatProjectionMapper,
json_type_hint: &HashMap<String, JsonNativeType>,
) {
let output_schema = mapper.output_schema();
let output_arrow_schema = output_schema.arrow_schema();
if !output_arrow_schema.has_json_extension_field() {
return;
}
let mut column_schemas = output_schema.column_schemas().to_vec();
for (idx, column_schema) in column_schemas.iter_mut().enumerate() {
if !is_json_extension_type(&output_arrow_schema.fields()[idx]) {
continue;
}
let Some(json_type) = json_type_hint.get(&column_schema.name) else {
continue;
};
column_schema.data_type = ConcreteDataType::from(json_type);
}
let output_schema = Arc::new(Schema::new_with_version(
column_schemas,
output_schema.version(),
));
mapper.with_output_schema(output_schema);
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {

View File

@@ -33,6 +33,7 @@ use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::schema::ext::ArrowSchemaExt;
use futures::StreamExt;
use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
@@ -436,8 +437,11 @@ impl ParquetReaderBuilder {
.unwrap_or_else(|| region_meta.schema.clone());
// Create ArrowReaderMetadata for async stream building.
let arrow_reader_options =
ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone());
let mut arrow_reader_options = ArrowReaderOptions::new();
if !read_format.arrow_schema().has_json_extension_field() {
arrow_reader_options =
arrow_reader_options.with_schema(read_format.arrow_schema().clone());
}
let arrow_metadata =
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
.context(ReadDataPartSnafu)?;

View File

@@ -15,6 +15,7 @@
//! Dummy catalog for region server.
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
@@ -30,6 +31,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::types::json_type::JsonNativeType;
use futures::stream::BoxStream;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
@@ -286,6 +288,10 @@ impl DummyTableProvider {
self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
}
pub(crate) fn with_json_type_hint(&self, hint: HashMap<String, JsonNativeType>) {
self.scan_request.lock().unwrap().json_type_hint = hint;
}
/// Gets the scan request of the provider.
#[cfg(test)]
pub fn scan_request(&self) -> ScanRequest {

View File

@@ -15,6 +15,7 @@
pub mod constant_term;
pub mod count_nest_aggr;
pub mod count_wildcard;
pub(crate) mod json_type_concretize;
pub mod parallelize_scan;
pub mod pass_distribution;
pub mod remove_duplicate;

View File

@@ -0,0 +1,308 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use arrow_schema::DataType;
use common_function::scalars::json::json_get::JsonGetWithType;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{Result, plan_datafusion_err, plan_err};
use datafusion_expr::{Expr, LogicalPlan};
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use datatypes::types::json_type::{JsonNativeType, JsonObjectType};
use crate::dummy_catalog::DummyTableProvider;
/// Concretize (deduce) the expected JSON type from query.
/// For example, we can concretize a JSON type of `{ a: { b: Number } }` from `select j.a.b::Int64`.
/// The JSON type will be later set into the scan request, for converting the JSON arrays.
#[derive(Debug)]
pub(crate) struct JsonTypeConcretizeRule;
impl OptimizerRule for JsonTypeConcretizeRule {
fn name(&self) -> &str {
"JsonTypeConcretizeRule"
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let json_types = deduce_json_types(&plan)?;
if json_types.is_empty() {
return Ok(Transformed::no(plan));
}
plan.transform_down(|plan| match &plan {
LogicalPlan::TableScan(table_scan) => {
let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
else {
return Ok(Transformed::no(plan));
};
let Some(adapter) = source
.table_provider
.as_any()
.downcast_ref::<DummyTableProvider>()
else {
return Ok(Transformed::no(plan));
};
adapter.with_json_type_hint(json_types.clone());
Ok(Transformed::yes(plan))
}
_ => Ok(Transformed::no(plan)),
})
}
}
fn deduce_json_types(plan: &LogicalPlan) -> Result<HashMap<String, JsonNativeType>> {
let mut json_types = HashMap::<String, JsonNativeType>::new();
plan.apply(|plan| {
for expr in plan.expressions() {
expr.apply(|expr| {
if let Some((column, json_type)) = deduce_json_type(expr)? {
json_types.entry(column).or_default().merge(&json_type);
}
Ok(TreeNodeRecursion::Continue)
})?;
}
Ok(TreeNodeRecursion::Continue)
})?;
Ok(json_types)
}
fn deduce_json_type(expr: &Expr) -> Result<Option<(String, JsonNativeType)>> {
let f = match expr {
Expr::ScalarFunction(f) if f.name().eq_ignore_ascii_case(JsonGetWithType::NAME) => f,
_ => return Ok(None),
};
let Some(Expr::Column(column)) = f.args.first() else {
return plan_err!(
"First argument of {} is expected to be a column expr, actual: {}",
JsonGetWithType::NAME,
f.args[0]
);
};
let Some(path) = f
.args
.get(1)
.and_then(|expr| expr.as_literal())
.and_then(|x| x.try_as_str())
.flatten()
else {
return plan_err!(
"Second argument of {} is expected to be a string literal, actual: {}",
JsonGetWithType::NAME,
f.args[1]
);
};
let with_type = f
.args
.get(2)
.and_then(|expr| expr.as_literal())
.map(|x| x.data_type())
.unwrap_or(DataType::Utf8View);
let with_type =
JsonNativeType::try_from(&with_type).map_err(|e| plan_datafusion_err!("{e:?}"))?;
let mut split = path.rsplit(".");
let Some(leaf) = split.next() else {
return Ok(Some((column.name.clone(), JsonNativeType::String)));
};
let mut object = JsonObjectType::new();
object.insert(leaf.to_string(), with_type);
let mut root = JsonNativeType::Object(object);
for s in split {
let mut object = JsonObjectType::new();
object.insert(s.to_string(), root);
root = JsonNativeType::Object(object);
}
Ok(Some((column.name.clone(), root)))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_function::scalars::udf::create_udf;
use datafusion::datasource::provider_as_source;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{LogicalPlanBuilder, col};
use datafusion_optimizer::OptimizerContext;
use store_api::storage::RegionId;
use super::*;
use crate::optimizer::test_util::mock_table_provider;
fn json_get_expr(base: Expr, path: Expr, with_type: Option<DataType>) -> Result<Expr> {
let json_get = Arc::new(create_udf(Arc::new(JsonGetWithType::default())));
let mut args = vec![base, path];
if let Some(with_type) = with_type {
let with_type = ScalarValue::try_new_null(&with_type)?;
args.push(Expr::Literal(with_type, None));
}
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
json_get, args,
)))
}
fn path_expr(path: &str) -> Expr {
Expr::Literal(ScalarValue::Utf8(Some(path.to_string())), None)
}
fn build_plan(exprs: Vec<Expr>) -> Result<(Arc<DummyTableProvider>, LogicalPlan)> {
let provider = Arc::new(mock_table_provider(RegionId::new(1024, 1)));
let plan = LogicalPlanBuilder::scan("t", provider_as_source(provider.clone()), None)?
.project(exprs)?
.build()?;
Ok((provider, plan))
}
#[test]
fn test_json_type_concretize_rule_rewrite() -> Result<()> {
let exprs = vec![
json_get_expr(col("k0"), path_expr("a.b"), Some(DataType::Int64))?.alias("ab"),
json_get_expr(col("k0"), path_expr("a.c"), None)?.alias("ac"),
json_get_expr(col("k0"), path_expr("d"), Some(DataType::Boolean))?.alias("d"),
];
let (provider, plan) = build_plan(exprs)?;
assert!(
JsonTypeConcretizeRule
.rewrite(plan, &OptimizerContext::default())?
.transformed
);
let expected = JsonNativeType::Object(JsonObjectType::from([
(
"a".to_string(),
JsonNativeType::Object(JsonObjectType::from([
("b".to_string(), JsonNativeType::i64()),
("c".to_string(), JsonNativeType::String),
])),
),
("d".to_string(), JsonNativeType::Bool),
]));
let request = provider.scan_request();
assert_eq!(1, request.json_type_hint.len());
assert_eq!(Some(&expected), request.json_type_hint.get("k0"));
Ok(())
}
#[test]
fn test_json_type_concretize_rule_conflict_to_variant() -> Result<()> {
let exprs = vec![
json_get_expr(col("k0"), path_expr("a"), Some(DataType::Int64))?.alias("a_num"),
json_get_expr(col("k0"), path_expr("a.b"), Some(DataType::Boolean))?.alias("a_obj"),
];
let (provider, plan) = build_plan(exprs)?;
assert!(
JsonTypeConcretizeRule
.rewrite(plan, &OptimizerContext::default())?
.transformed
);
let expected = JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::Variant,
)]));
assert_eq!(
Some(&expected),
provider.scan_request().json_type_hint.get("k0")
);
Ok(())
}
#[test]
fn test_json_type_concretize_rule_no_json_get() -> Result<()> {
let (provider, plan) = build_plan(vec![col("k0"), col("v0")])?;
assert!(
!JsonTypeConcretizeRule
.rewrite(plan, &OptimizerContext::default())?
.transformed
);
assert!(provider.scan_request().json_type_hint.is_empty());
Ok(())
}
#[test]
fn test_deduce_json_type_with_non_column_base() -> Result<()> {
let expr = json_get_expr(
Expr::Literal(ScalarValue::Utf8(Some("{}".to_string())), None),
path_expr("a"),
Some(DataType::Int64),
)?;
let err = deduce_json_type(&expr).unwrap_err();
assert!(
err.to_string()
.contains("First argument of json_get is expected to be a column expr")
);
Ok(())
}
#[test]
fn test_deduce_json_type_with_non_literal_path() -> Result<()> {
let expr = json_get_expr(
Expr::Column(Column::new_unqualified("k0")),
Expr::Column(Column::new_unqualified("path_col")),
Some(DataType::Int64),
)?;
let err = deduce_json_type(&expr).unwrap_err();
assert!(
err.to_string()
.contains("Second argument of json_get is expected to be a string literal")
);
Ok(())
}
#[test]
fn test_deduce_json_type_default_string() -> Result<()> {
let expr = json_get_expr(
Expr::Column(Column::new_unqualified("k0")),
path_expr("a.b"),
None,
)?;
let deduced = deduce_json_type(&expr)?;
let expected = JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"b".to_string(),
JsonNativeType::String,
)])),
)]));
assert_eq!(Some(("k0".to_string(), expected)), deduced);
Ok(())
}
}

View File

@@ -62,6 +62,7 @@ use crate::optimizer::ExtensionAnalyzerRule;
use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
use crate::optimizer::count_nest_aggr::CountNestAggrRule;
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use crate::optimizer::json_type_concretize::JsonTypeConcretizeRule;
use crate::optimizer::parallelize_scan::ParallelizeScan;
use crate::optimizer::pass_distribution::PassDistribution;
use crate::optimizer::remove_duplicate::RemoveDuplicate;
@@ -174,6 +175,7 @@ impl QueryEngineState {
let mut optimizer = Optimizer::new();
optimizer.rules.push(Arc::new(ScanHintRule));
optimizer.rules.push(Arc::new(JsonTypeConcretizeRule));
// add physical optimizer
let mut physical_optimizer = PhysicalOptimizer::new();

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use common_error::ext::BoxedError;
@@ -19,6 +20,8 @@ use common_recordbatch::OrderOption;
use datafusion_expr::expr::Expr;
// Re-export vector types from datatypes to avoid duplication
pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
use datatypes::types::json_type::JsonNativeType;
use itertools::Itertools;
use strum::Display;
use crate::storage::{ColumnId, ProjectionInput, SequenceNumber};
@@ -128,6 +131,8 @@ pub struct ScanRequest {
/// Optional hint for KNN vector search. When set, the scan should use
/// vector index to find the k nearest neighbors.
pub vector_search: Option<VectorSearchRequest>,
/// Optional hint from query-driven JSON type concretization.
pub json_type_hint: HashMap<String, JsonNativeType>,
}
impl ScanRequest {
@@ -227,6 +232,17 @@ impl Display for ScanRequest {
vector_search.metric
)?;
}
if !self.json_type_hint.is_empty() {
write!(
f,
"{}json_type_hint: {}",
delimiter.as_str(),
self.json_type_hint
.iter()
.map(|(column, json_type)| format!("({column}: {json_type})"))
.join(", ")
)?;
}
write!(f, " }}")
}
}

View File

@@ -131,6 +131,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
|_|_TableScan: test_|
|_| ]]_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan after JsonTypeConcretizeRule_| SAME TEXT AS ABOVE_|
| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_|
| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_|
@@ -156,6 +157,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan after JsonTypeConcretizeRule_| SAME TEXT AS ABOVE_|
| logical_plan_| MergeScan [is_placeholder=false, remote_input=[_|
|_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
@@ -276,6 +278,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
|_|_TableScan: test_|
|_| ]]_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan after JsonTypeConcretizeRule_| SAME TEXT AS ABOVE_|
| logical_plan after rewrite_set_comparison_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_unions_| SAME TEXT AS ABOVE_|
| logical_plan after simplify_expressions_| SAME TEXT AS ABOVE_|
@@ -301,6 +304,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan after JsonTypeConcretizeRule_| SAME TEXT AS ABOVE_|
| logical_plan_| MergeScan [is_placeholder=false, remote_input=[_|
|_| Projection: test.i AS series, test.k, test.j_|
|_|_PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|

View File

@@ -92,6 +92,57 @@ explain select j.a.x::bool from json2_table;
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------+
select j.a.b from json2_table order by ts;
+-------------------------------------+
| json_get(json2_table.j,Utf8("a.b")) |
+-------------------------------------+
| 1 |
| -2 |
| 3 |
| -4 |
| |
| |
| "s7" |
| 8 |
| |
| 10 |
+-------------------------------------+
select j.c, j.y from json2_table order by ts;
+-----------------------------------+-----------------------------------+
| json_get(json2_table.j,Utf8("c")) | json_get(json2_table.j,Utf8("y")) |
+-----------------------------------+-----------------------------------+
| s1 | |
| s2 | |
| s3 | |
| | |
| s5 | |
| s6 | |
| [1] | |
| "s8" | |
| s9 | |
| | false |
+-----------------------------------+-----------------------------------+
select j.d from json2_table order by ts;
+-----------------------------------+
| json_get(json2_table.j,Utf8("d")) |
+-----------------------------------+
| [{e: {f: 0.1}}] |
| [{e: {f: 0.2}}] |
| |
| [{e: {g: -0.4}}] |
| |
| |
| [{e: {g: -0.7}}] |
| |
| [{e: {g: -0.9}}] |
| |
+-----------------------------------+
drop table json2_table;
Affected Rows: 0

View File

@@ -38,4 +38,10 @@ explain select j.a.b from json2_table;
-- SQLNESS REPLACE (peers.*) REDACTED
explain select j.a.x::bool from json2_table;
select j.a.b from json2_table order by ts;
select j.c, j.y from json2_table order by ts;
select j.d from json2_table order by ts;
drop table json2_table;