From 160b7e720b4f12b538f851658fd7ee9669a24b49 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Thu, 7 May 2026 14:44:46 +0800 Subject: [PATCH] feat: json expr planner (#8066) Signed-off-by: luofucong --- Cargo.lock | 1 + .../function/src/scalars/json/json_get.rs | 4 +- src/query/Cargo.toml | 1 + src/query/src/datafusion.rs | 1 + src/query/src/datafusion/json_expr_planner.rs | 286 ++++++++++++++++++ src/query/src/datafusion/planner.rs | 6 +- .../standalone/common/types/json/json2.result | 30 ++ .../standalone/common/types/json/json2.sql | 6 + 8 files changed, 332 insertions(+), 3 deletions(-) create mode 100644 src/query/src/datafusion/json_expr_planner.rs diff --git a/Cargo.lock b/Cargo.lock index 134dd36782..d51631d993 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11093,6 +11093,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-sql", "datatypes", + "either", "fastrand", "futures", "futures-util", diff --git a/src/common/function/src/scalars/json/json_get.rs b/src/common/function/src/scalars/json/json_get.rs index 00f528d0a9..a12ca5691d 100644 --- a/src/common/function/src/scalars/json/json_get.rs +++ b/src/common/function/src/scalars/json/json_get.rs @@ -439,12 +439,12 @@ fn json_struct_get(array: &ArrayRef, path: &str, with_type: Option<&DataType>) - /// use the third argument's type to determine the return type. #[derive(Debug, Display)] #[display("{}", Self::NAME.to_ascii_uppercase())] -pub(super) struct JsonGetWithType { +pub struct JsonGetWithType { signature: Signature, } impl JsonGetWithType { - pub(crate) const NAME: &'static str = "json_get"; + pub const NAME: &'static str = "json_get"; } impl Default for JsonGetWithType { diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index af81325aa9..6bbd9e1dd1 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -45,6 +45,7 @@ datafusion-optimizer.workspace = true datafusion-physical-expr.workspace = true datafusion-sql.workspace = true datatypes.workspace = true +either.workspace = true futures.workspace = true futures-util.workspace = true greptime-proto.workspace = true diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 6fc78c59e5..46bb47c788 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -15,6 +15,7 @@ //! Planner, QueryEngine implementations based on DataFusion. mod error; +mod json_expr_planner; mod planner; use std::any::Any; diff --git a/src/query/src/datafusion/json_expr_planner.rs b/src/query/src/datafusion/json_expr_planner.rs new file mode 100644 index 0000000000..4546c0a84d --- /dev/null +++ b/src/query/src/datafusion/json_expr_planner.rs @@ -0,0 +1,286 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, LazyLock}; + +use arrow_schema::Field; +use arrow_schema::extension::ExtensionType; +use common_function::scalars::json::json_get::JsonGetWithType; +use common_function::scalars::udf::create_udf; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_common::{Column, DFSchema, Result, ScalarValue, TableReference}; +use datafusion_expr::expr::{BinaryExpr, ScalarFunction}; +use datafusion_expr::planner::{ExprPlanner, PlannerResult, RawBinaryExpr}; +use datafusion_expr::{Expr, ExprSchemable, Operator, ScalarUDF}; +use datatypes::extension::json::JsonExtensionType; +use either::Either; +use sqlparser::ast::BinaryOperator; + +/// Rewrites JSON-aware SQL expressions into DataFusion expressions. +/// +/// This planner handles two cases: +/// - Rewrites compound identifiers on JSON extension columns into `json_get` function. +/// For example, `select a.b.c` => `select json_get(a, "b.c")`. +/// - Pushes an "expected type" argument into the `json_get` function when it participates in a +/// binary operator. So that `json_get` knows the wanted data type when dealing with variant +/// JSON values. +/// For example, `select json_get(a, "b.c") + 1` => `select json_get(a, "b.c", NULL::Int64) + 1`. +#[derive(Debug)] +pub(crate) struct JsonExprPlanner; + +impl ExprPlanner for JsonExprPlanner { + fn plan_binary_op( + &self, + expr: RawBinaryExpr, + schema: &DFSchema, + ) -> Result> { + let RawBinaryExpr { + op, + mut left, + mut right, + } = expr; + + if extract_untyped_json_get(&mut left).is_none() + && extract_untyped_json_get(&mut right).is_none() + { + return Ok(PlannerResult::Original(RawBinaryExpr { op, left, right })); + } + + let Some(expr_op) = parse_sql_op(&op) else { + return Ok(PlannerResult::Original(RawBinaryExpr { op, left, right })); + }; + + let left_type = left.get_type(schema)?; + let right_type = right.get_type(schema)?; + let left = push_json_get_type_arg(left, right_type)?; + let right = push_json_get_type_arg(right, left_type)?; + match (left, right) { + (Either::Left(left), Either::Left(right)) => { + Ok(PlannerResult::Original(RawBinaryExpr { op, left, right })) + } + (left, right) => Ok(PlannerResult::Planned(Expr::BinaryExpr(BinaryExpr::new( + Box::new(left.into_inner()), + expr_op, + Box::new(right.into_inner()), + )))), + } + } + + fn plan_compound_identifier( + &self, + field: &Field, + qualifier: Option<&TableReference>, + nested_names: &[String], + ) -> Result>> { + if field.extension_type_name() != Some(JsonExtensionType::NAME) { + return Ok(PlannerResult::Original(Vec::new())); + } + + static JSON_GET_UDF: LazyLock> = + LazyLock::new(|| Arc::new(create_udf(Arc::new(JsonGetWithType::default())))); + + let json_get = JSON_GET_UDF.clone(); + let path = nested_names.join("."); + Ok(PlannerResult::Planned(Expr::ScalarFunction( + ScalarFunction::new_udf( + json_get, + vec![ + Expr::Column(Column::from((qualifier, field))), + Expr::Literal(ScalarValue::Utf8(Some(path)), None), + ], + ), + ))) + } +} + +fn extract_untyped_json_get(expr: &mut Expr) -> Option<&mut ScalarFunction> { + match expr { + Expr::ScalarFunction(f) + if f.func.name().eq_ignore_ascii_case(JsonGetWithType::NAME) && f.args.len() == 2 => + { + Some(f) + } + _ => None, + } +} + +fn push_json_get_type_arg(mut expr: Expr, data_type: DataType) -> Result> { + let Some(json_get) = extract_untyped_json_get(&mut expr) else { + return Ok(Either::Left(expr)); + }; + + let with_type = ScalarValue::try_new_null(&data_type).map(|x| Expr::Literal(x, None))?; + json_get.args.push(with_type); + + Ok(Either::Right(expr)) +} + +fn parse_sql_op(op: &BinaryOperator) -> Option { + match *op { + BinaryOperator::Plus => Some(Operator::Plus), + BinaryOperator::Minus => Some(Operator::Minus), + BinaryOperator::Multiply => Some(Operator::Multiply), + BinaryOperator::Divide => Some(Operator::Divide), + BinaryOperator::Modulo => Some(Operator::Modulo), + BinaryOperator::Gt => Some(Operator::Gt), + BinaryOperator::GtEq => Some(Operator::GtEq), + BinaryOperator::Lt => Some(Operator::Lt), + BinaryOperator::LtEq => Some(Operator::LtEq), + BinaryOperator::Eq => Some(Operator::Eq), + BinaryOperator::NotEq => Some(Operator::NotEq), + BinaryOperator::And => Some(Operator::And), + BinaryOperator::Or => Some(Operator::Or), + BinaryOperator::BitwiseAnd => Some(Operator::BitwiseAnd), + BinaryOperator::BitwiseOr => Some(Operator::BitwiseOr), + BinaryOperator::BitwiseXor => Some(Operator::BitwiseXor), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use arrow_schema::Fields; + use datatypes::extension::json::JsonMetadata; + + use super::*; + + fn json_get_expr(base: Expr, path: &str) -> Expr { + let json_get = Arc::new(create_udf(Arc::new(JsonGetWithType::default()))); + Expr::ScalarFunction(ScalarFunction::new_udf( + json_get, + vec![ + base, + Expr::Literal(ScalarValue::Utf8(Some(path.to_string())), None), + ], + )) + } + + fn json_field(name: &str) -> Field { + Field::new(name, DataType::Binary, true) + .with_extension_type(JsonExtensionType::new(Arc::new(JsonMetadata::default()))) + } + + #[test] + fn test_plan_binary_op() -> Result<()> { + let planner = JsonExprPlanner; + let schema = DFSchema::from_unqualified_fields( + Fields::from(vec![Field::new("value", DataType::Int64, true)]), + Default::default(), + )?; + + let planned = planner.plan_binary_op( + RawBinaryExpr { + op: BinaryOperator::Eq, + left: json_get_expr( + Expr::Literal(ScalarValue::Binary(Some(b"{\"a\": 1}".to_vec())), None), + "a", + ), + right: Expr::Column(Column::new_unqualified("value")), + }, + &schema, + )?; + + match planned { + PlannerResult::Planned(Expr::BinaryExpr(expr)) => { + assert_eq!(expr.op, Operator::Eq); + + match expr.left.as_ref() { + Expr::ScalarFunction(func) => { + assert_eq!(func.func.name(), JsonGetWithType::NAME); + assert_eq!(func.args.len(), 3); + assert_eq!(func.args[2], Expr::Literal(ScalarValue::Int64(None), None)); + } + other => panic!("expected json_get on left side, got {other:?}"), + } + + assert_eq!( + expr.right.as_ref(), + &Expr::Column(Column::new_unqualified("value")) + ); + } + other => panic!("expected planned binary expression, got {other:?}"), + } + + let original = planner.plan_binary_op( + RawBinaryExpr { + op: BinaryOperator::StringConcat, + left: Expr::Column(Column::new_unqualified("value")), + right: Expr::Literal(ScalarValue::Utf8(Some("x".to_string())), None), + }, + &schema, + )?; + + match original { + PlannerResult::Original(expr) => { + assert!(matches!(expr.op, BinaryOperator::StringConcat)); + assert_eq!(expr.left, Expr::Column(Column::new_unqualified("value"))); + assert_eq!( + expr.right, + Expr::Literal(ScalarValue::Utf8(Some("x".to_string())), None) + ); + } + other => panic!( + "expected original expression for unsupported operator, got {:?}", + other, + ), + } + + Ok(()) + } + + #[test] + fn test_plan_compound_identifier() -> Result<()> { + let planner = JsonExprPlanner; + let qualifier = TableReference::bare("events"); + let nested_names = vec!["payload".to_string(), "cpu".to_string()]; + + let planned = planner.plan_compound_identifier( + &json_field("labels"), + Some(&qualifier), + &nested_names, + )?; + + match planned { + PlannerResult::Planned(Expr::ScalarFunction(func)) => { + assert_eq!(func.func.name(), JsonGetWithType::NAME); + assert_eq!(func.args.len(), 2); + assert_eq!( + func.args[0], + Expr::Column(Column::new(Some(qualifier.clone()), "labels")) + ); + assert_eq!( + func.args[1], + Expr::Literal(ScalarValue::Utf8(Some("payload.cpu".to_string())), None) + ); + } + other => panic!("expected json_get scalar function, got {other:?}"), + } + + let original = planner.plan_compound_identifier( + &Field::new("plain", DataType::Utf8, true), + Some(&qualifier), + &nested_names, + )?; + + match original { + PlannerResult::Original(exprs) => assert!(exprs.is_empty()), + other => panic!( + "expected original empty result for non-json field, got {:?}", + other, + ), + } + + Ok(()) + } +} diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index d9c74b9d5a..856f2189d2 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -38,6 +38,7 @@ use datafusion_sql::parser::Statement as DfStatement; use session::context::QueryContextRef; use snafu::{Location, ResultExt}; +use crate::datafusion::json_expr_planner::JsonExprPlanner; use crate::error::{CatalogSnafu, Result}; use crate::query_engine::{DefaultPlanDecoder, QueryEngineState}; @@ -87,6 +88,9 @@ impl DfContextProviderAdapter { .map(|format| (format.get_ext().to_lowercase(), format)) .collect(); + let mut expr_planners = SessionStateDefaults::default_expr_planners(); + expr_planners.insert(0, Arc::new(JsonExprPlanner)); + Ok(Self { engine_state, session_state, @@ -94,7 +98,7 @@ impl DfContextProviderAdapter { table_provider, query_ctx, file_formats, - expr_planners: SessionStateDefaults::default_expr_planners(), + expr_planners, }) } } diff --git a/tests/cases/standalone/common/types/json/json2.result b/tests/cases/standalone/common/types/json/json2.result index e13e2307e1..abf0a4eda2 100644 --- a/tests/cases/standalone/common/types/json/json2.result +++ b/tests/cases/standalone/common/types/json/json2.result @@ -62,6 +62,36 @@ values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'), Affected Rows: 2 +-- SQLNESS REPLACE (peers.*) REDACTED +explain select j.a.b from json2_table; + ++---------------+---------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: json_get(json2_table.j, Utf8("a.b")) | +| | TableScan: json2_table | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+---------------------------------------------------+ + +-- SQLNESS REPLACE (peers.*) REDACTED +explain select j.a.x::bool from json2_table; + ++---------------+------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Projection: json_get(json2_table.j, Utf8("a.x"), Boolean(NULL)) AS arrow_cast(json_get(json2_table.j,Utf8("a.x")),Utf8("Boolean")) | +| | TableScan: json2_table | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------+ + drop table json2_table; Affected Rows: 0 diff --git a/tests/cases/standalone/common/types/json/json2.sql b/tests/cases/standalone/common/types/json/json2.sql index 3644f06be4..5b5aba75e6 100644 --- a/tests/cases/standalone/common/types/json/json2.sql +++ b/tests/cases/standalone/common/types/json/json2.sql @@ -32,4 +32,10 @@ insert into json2_table values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'), (10, '{"a": {"b": 10}, "y": false}'); +-- SQLNESS REPLACE (peers.*) REDACTED +explain select j.a.b from json2_table; + +-- SQLNESS REPLACE (peers.*) REDACTED +explain select j.a.x::bool from json2_table; + drop table json2_table;