From 0c54e70e1f2b119f99b76e7ea6caf0a429b0255c Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Mon, 29 Dec 2025 12:35:53 +0800 Subject: [PATCH] feat: impl `json_get_string` with new json type (#7489) * impl `json_get_string` with new json type Signed-off-by: luofucong * resolve PR comments Signed-off-by: luofucong * fix ci Signed-off-by: luofucong --------- Signed-off-by: luofucong --- Cargo.lock | 2 + Cargo.toml | 1 + src/common/function/Cargo.toml | 2 + .../function/src/scalars/json/json_get.rs | 356 +++++++++++++++--- src/datatypes/src/arrow_array.rs | 26 ++ tests-integration/tests/jsonbench.rs | 41 ++ 6 files changed, 378 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f1be09fa0..6a21760ffb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2190,6 +2190,7 @@ dependencies = [ "approx 0.5.1", "arc-swap", "arrow", + "arrow-cast", "arrow-schema", "async-trait", "bincode", @@ -2220,6 +2221,7 @@ dependencies = [ "h3o", "hyperloglogplus", "jsonb", + "jsonpath-rust 0.7.5", "memchr", "mito-codec", "nalgebra", diff --git a/Cargo.toml b/Cargo.toml index 0ec930a845..e0e2701f85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,6 +103,7 @@ aquamarine = "0.6" arrow = { version = "56.2", features = ["prettyprint"] } arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] } arrow-buffer = "56.2" +arrow-cast = "56.2" arrow-flight = "56.2" arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] } arrow-schema = { version = "56.2", features = ["serde"] } diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 7766f857d0..d0f7ac685f 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -17,6 +17,7 @@ ahash.workspace = true api.workspace = true arc-swap = "1.0" arrow.workspace = true +arrow-cast.workspace = true arrow-schema.workspace = true async-trait.workspace = true bincode = "=1.3.3" @@ -46,6 +47,7 @@ geohash = { version = "0.13", optional = true } h3o = { version = "0.6", optional = true } hyperloglogplus = "0.4" jsonb.workspace = true +jsonpath-rust = "0.7.5" memchr = "2.7" mito-codec.workspace = true nalgebra.workspace = true diff --git a/src/common/function/src/scalars/json/json_get.rs b/src/common/function/src/scalars/json/json_get.rs index 92ea9cf990..d6c135bd82 100644 --- a/src/common/function/src/scalars/json/json_get.rs +++ b/src/common/function/src/scalars/json/json_get.rs @@ -13,17 +13,24 @@ // limitations under the License. use std::fmt::{self, Display}; +use std::str::FromStr; use std::sync::Arc; +use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray}; use arrow::compute; -use datafusion_common::DataFusionError; +use arrow::datatypes::{Float64Type, Int64Type, UInt64Type}; use datafusion_common::arrow::array::{ Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder, StringViewBuilder, }; use datafusion_common::arrow::datatypes::DataType; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::type_coercion::aggregates::STRINGS; -use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature}; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; +use datatypes::arrow_array::string_array_value_at_index; +use datatypes::json::JsonStructureSettings; +use jsonpath_rust::JsonPath; +use serde_json::Value; use crate::function::{Function, extract_args}; use crate::helper; @@ -158,11 +165,7 @@ impl JsonGetString { impl Default for JsonGetString { fn default() -> Self { Self { - // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type. - signature: helper::one_of_sigs2( - vec![DataType::Binary, DataType::BinaryView], - vec![DataType::Utf8, DataType::Utf8View], - ), + signature: Signature::any(2, Volatility::Immutable), } } } @@ -172,7 +175,7 @@ impl Function for JsonGetString { Self::NAME } - fn return_type(&self, _: &[DataType]) -> datafusion_common::Result { + fn return_type(&self, _: &[DataType]) -> Result { Ok(DataType::Utf8View) } @@ -180,33 +183,203 @@ impl Function for JsonGetString { &self.signature } - fn invoke_with_args( - &self, - args: ScalarFunctionArgs, - ) -> datafusion_common::Result { + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let [arg0, arg1] = extract_args(self.name(), &args)?; - let arg0 = compute::cast(&arg0, &DataType::BinaryView)?; - let jsons = arg0.as_binary_view(); + let arg1 = compute::cast(&arg1, &DataType::Utf8View)?; let paths = arg1.as_string_view(); - let size = jsons.len(); - let mut builder = StringViewBuilder::with_capacity(size); + let result = match arg0.data_type() { + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { + let arg0 = compute::cast(&arg0, &DataType::BinaryView)?; + let jsons = arg0.as_binary_view(); + jsonb_get_string(jsons, paths)? + } + DataType::Struct(_) => { + let jsons = arg0.as_struct(); + json_struct_get_string(jsons, paths)? + } + _ => { + return Err(DataFusionError::Execution(format!( + "{} not supported argument type {}", + Self::NAME, + arg0.data_type(), + ))); + } + }; - for i in 0..size { - let json = jsons.is_valid(i).then(|| jsons.value(i)); - let path = paths.is_valid(i).then(|| paths.value(i)); - let result = match (json, path) { - (Some(json), Some(path)) => { - get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok()) - } - _ => None, + Ok(ColumnarValue::Array(result)) + } +} + +fn jsonb_get_string(jsons: &BinaryViewArray, paths: &StringViewArray) -> Result { + let size = jsons.len(); + let mut builder = StringViewBuilder::with_capacity(size); + + for i in 0..size { + let json = jsons.is_valid(i).then(|| jsons.value(i)); + let path = paths.is_valid(i).then(|| paths.value(i)); + let result = match (json, path) { + (Some(json), Some(path)) => { + get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok()) + } + _ => None, + }; + builder.append_option(result); + } + + Ok(Arc::new(builder.finish())) +} + +fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Result { + let size = jsons.len(); + let mut builder = StringViewBuilder::with_capacity(size); + + for i in 0..size { + if jsons.is_null(i) || paths.is_null(i) { + builder.append_null(); + continue; + } + let path = paths.value(i); + + // naively assume the JSON path is our kind of indexing to the field, by removing its "root" + let field_path = path.trim().replace("$.", ""); + let column = jsons.column_by_name(&field_path); + + if let Some(column) = column { + if let Some(v) = string_array_value_at_index(column, i) { + builder.append_value(v); + } else { + builder.append_value(arrow_cast::display::array_value_to_string(column, i)?); + } + } else { + let Some(raw) = jsons + .column_by_name(JsonStructureSettings::RAW_FIELD) + .and_then(|x| string_array_value_at_index(x, i)) + else { + builder.append_null(); + continue; }; - builder.append_option(result); + + let path: JsonPath = JsonPath::try_from(path).map_err(|e| { + DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}")) + })?; + // the wanted field is not retrievable from the JSON struct columns directly, we have + // to combine everything (columns and the "_raw") into a complete JSON value to find it + let value = json_struct_to_value(raw, jsons, i)?; + + match path.find(&value) { + Value::Null => builder.append_null(), + Value::Array(values) => match values.as_slice() { + [] => builder.append_null(), + [x] => { + if let Some(s) = x.as_str() { + builder.append_value(s) + } else { + builder.append_value(x.to_string()) + } + } + x => builder.append_value( + x.iter() + .map(|v| v.to_string()) + .collect::>() + .join(", "), + ), + }, + // Safety: guarded by the returns of `path.find` as documented + _ => unreachable!(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result { + let Ok(mut json) = Value::from_str(raw) else { + return Err(DataFusionError::Internal(format!( + "inner field '{}' is not a valid JSON string", + JsonStructureSettings::RAW_FIELD + ))); + }; + + for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) { + if column_name == JsonStructureSettings::RAW_FIELD { + continue; } - Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".") + { + let json_pointer = format!("/{}", json_object.replace(".", "/")); + (json_pointer, field) + } else { + ("".to_string(), column_name) + }; + let Some(json_object) = json + .pointer_mut(&json_pointer) + .and_then(|x| x.as_object_mut()) + else { + return Err(DataFusionError::Internal(format!( + "value at JSON pointer '{}' is not an object", + json_pointer + ))); + }; + + macro_rules! insert { + ($column: ident, $i: ident, $json_object: ident, $field: ident) => {{ + if let Some(value) = $column + .is_valid($i) + .then(|| serde_json::Value::from($column.value($i))) + { + $json_object.insert($field.to_string(), value); + } + }}; + } + + match column.data_type() { + // boolean => Value::Bool + DataType::Boolean => { + let column = column.as_boolean(); + insert!(column, i, json_object, field); + } + // int => Value::Number + DataType::Int64 => { + let column = column.as_primitive::(); + insert!(column, i, json_object, field); + } + DataType::UInt64 => { + let column = column.as_primitive::(); + insert!(column, i, json_object, field); + } + DataType::Float64 => { + let column = column.as_primitive::(); + insert!(column, i, json_object, field); + } + // string => Value::String + DataType::Utf8 => { + let column = column.as_string::(); + insert!(column, i, json_object, field); + } + DataType::LargeUtf8 => { + let column = column.as_string::(); + insert!(column, i, json_object, field); + } + DataType::Utf8View => { + let column = column.as_string_view(); + insert!(column, i, json_object, field); + } + // other => Value::Array and Value::Object + _ => { + return Err(DataFusionError::NotImplemented(format!( + "{} is not yet supported to be executed with field {} of datatype {}", + JsonGetString::NAME, + column_name, + column.data_type() + ))); + } + } } + Ok(json) } impl Display for JsonGetString { @@ -296,11 +469,13 @@ impl Display for JsonGetObject { mod tests { use std::sync::Arc; + use arrow::array::{Float64Array, Int64Array, StructArray}; use arrow_schema::Field; use datafusion_common::ScalarValue; use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray}; use datafusion_common::arrow::datatypes::{Float64Type, Int64Type}; use datatypes::types::parse_string_to_jsonb; + use serde_json::json; use super::*; @@ -474,42 +649,123 @@ mod tests { r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#, r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#, ]; - let paths = vec!["$.a.b", "$.a", ""]; - let results = [Some("a"), Some("d"), None]; - let jsonbs = json_strings + // complete JSON is: + // { + // "kind": "foo", + // "payload": { + // "code": 404, + // "success": false, + // "result": { + // "error": "not found", + // "time_cost": 1.234 + // } + // } + // } + let json_struct: ArrayRef = Arc::new(StructArray::new( + vec![ + Field::new("kind", DataType::Utf8, true), + Field::new("payload.code", DataType::Int64, true), + Field::new("payload.result.time_cost", DataType::Float64, true), + Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true), + ] + .into(), + vec![ + Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef, + Arc::new(Int64Array::from_iter([Some(404)])), + Arc::new(Float64Array::from_iter([Some(1.234)])), + Arc::new(StringViewArray::from_iter([Some( + json! ({ + "payload": { + "success": false, + "result": { + "error": "not found" + } + } + }) + .to_string(), + )])), + ], + None, + )); + + let paths = vec![ + "$.a.b", + "$.a", + "", + "$.kind", + "$.payload.code", + "$.payload.result.time_cost", + "$.payload", + "$.payload.success", + "$.payload.result", + "$.payload.result.error", + "$.payload.result.not-exists", + "$.payload.not-exists", + "$.not-exists", + "$", + ]; + let expects = [ + Some("a"), + Some("d"), + None, + Some("foo"), + Some("404"), + Some("1.234"), + Some( + r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#, + ), + Some("false"), + Some(r#"{"error":"not found","time_cost":1.234}"#), + Some("not found"), + None, + None, + None, + Some( + r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#, + ), + ]; + + let mut jsons = json_strings .iter() .map(|s| { let value = jsonb::parse_value(s.as_bytes()).unwrap(); - value.to_vec() + Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef }) .collect::>(); + let json_struct_arrays = + std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::>(); + jsons.extend(json_struct_arrays); - let args = ScalarFunctionArgs { - args: vec![ - ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))), - ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))), - ], - arg_fields: vec![], - number_rows: 3, - return_field: Arc::new(Field::new("x", DataType::Utf8View, false)), - config_options: Arc::new(Default::default()), - }; - let result = json_get_string - .invoke_with_args(args) - .and_then(|x| x.to_array(3)) - .unwrap(); - let vector = result.as_string_view(); + for i in 0..jsons.len() { + let json = &jsons[i]; + let path = paths[i]; + let expect = expects[i]; - assert_eq!(3, vector.len()); - for (i, gt) in results.iter().enumerate() { - let result = vector.is_valid(i).then(|| vector.value(i)); - assert_eq!(*gt, result); + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(json.clone()), + ColumnarValue::Scalar(path.into()), + ], + arg_fields: vec![], + number_rows: 1, + return_field: Arc::new(Field::new("x", DataType::Utf8View, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_get_string + .invoke_with_args(args) + .and_then(|x| x.to_array(1)) + .unwrap(); + + let result = result.as_string_view(); + assert_eq!(1, result.len()); + let actual = result.is_valid(0).then(|| result.value(0)); + assert_eq!(actual, expect); } } #[test] - fn test_json_get_object() -> datafusion_common::Result<()> { + fn test_json_get_object() -> Result<()> { let udf = JsonGetObject::default(); assert_eq!("json_get_object", udf.name()); assert_eq!( diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index ac5e6444af..97ed7d1bbf 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -19,6 +19,7 @@ use arrow::datatypes::{ Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; +use arrow_array::Array; use common_time::time::Time; use common_time::{Duration, Timestamp}; @@ -126,3 +127,28 @@ pub fn duration_array_value(array: &ArrayRef, i: usize) -> Duration { }; Duration::new(v, time_unit.into()) } + +/// Get the string value at index `i` for `Utf8`, `LargeUtf8`, or `Utf8View` arrays. +/// +/// Returns `None` when the array type is not a string type or the value is null. +/// +/// # Panics +/// +/// If index `i` is out of bounds. +pub fn string_array_value_at_index(array: &ArrayRef, i: usize) -> Option<&str> { + match array.data_type() { + DataType::Utf8 => { + let array = array.as_string::(); + array.is_valid(i).then(|| array.value(i)) + } + DataType::LargeUtf8 => { + let array = array.as_string::(); + array.is_valid(i).then(|| array.value(i)) + } + DataType::Utf8View => { + let array = array.as_string_view(); + array.is_valid(i).then(|| array.value(i)) + } + _ => None, + } +} diff --git a/tests-integration/tests/jsonbench.rs b/tests-integration/tests/jsonbench.rs index 33c83b0cbb..f5c66bf6ee 100644 --- a/tests-integration/tests/jsonbench.rs +++ b/tests-integration/tests/jsonbench.rs @@ -56,6 +56,47 @@ async fn query_data(frontend: &Arc) -> io::Result<()> { ))?; execute_sql_and_expect(frontend, sql, &expected).await; + // query 1: + let sql = "\ +SELECT \ + json_get_string(data, '$.commit.collection') AS event, count() AS count \ +FROM bluesky \ +GROUP BY event \ +ORDER BY count DESC, event ASC"; + let expected = r#" ++-----------------------+-------+ +| event | count | ++-----------------------+-------+ +| app.bsky.feed.like | 3 | +| app.bsky.feed.post | 3 | +| app.bsky.graph.follow | 3 | +| app.bsky.feed.repost | 1 | ++-----------------------+-------+"#; + execute_sql_and_expect(frontend, sql, expected).await; + + // query 2: + let sql = "\ +SELECT \ + json_get_string(data, '$.commit.collection') AS event, \ + count() AS count, \ + count(DISTINCT json_get_string(data, '$.did')) AS users \ +FROM bluesky \ +WHERE \ + (json_get_string(data, '$.kind') = 'commit') AND \ + (json_get_string(data, '$.commit.operation') = 'create') \ +GROUP BY event \ +ORDER BY count DESC, event ASC"; + let expected = r#" ++-----------------------+-------+-------+ +| event | count | users | ++-----------------------+-------+-------+ +| app.bsky.feed.like | 3 | 3 | +| app.bsky.feed.post | 3 | 3 | +| app.bsky.graph.follow | 3 | 3 | +| app.bsky.feed.repost | 1 | 1 | ++-----------------------+-------+-------+"#; + execute_sql_and_expect(frontend, sql, expected).await; + Ok(()) }