From dc9fc582a0f0ade7ce6ee910ff16be3e82a82a4d Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Tue, 30 Dec 2025 17:42:16 +0800 Subject: [PATCH] feat: impl `json_get_int` for new json type (#7495) Update src/common/function/src/scalars/json/json_get.rs impl `json_get_int` for new json type Signed-off-by: luofucong --- .../function/src/scalars/json/json_get.rs | 399 ++++++++++++------ src/datatypes/src/arrow_array.rs | 66 ++- tests-integration/tests/jsonbench.rs | 74 ++++ 3 files changed, 405 insertions(+), 134 deletions(-) diff --git a/src/common/function/src/scalars/json/json_get.rs b/src/common/function/src/scalars/json/json_get.rs index d6c135bd82..40bd78cbc4 100644 --- a/src/common/function/src/scalars/json/json_get.rs +++ b/src/common/function/src/scalars/json/json_get.rs @@ -27,7 +27,7 @@ use datafusion_common::arrow::datatypes::DataType; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::type_coercion::aggregates::STRINGS; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; -use datatypes::arrow_array::string_array_value_at_index; +use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index}; use datatypes::json::JsonStructureSettings; use jsonpath_rust::JsonPath; use serde_json::Value; @@ -131,13 +131,6 @@ macro_rules! json_get { }; } -json_get!( - JsonGetInt, - Int64, - i64, - "Get the value from the JSONB by the given path and return it as an integer." -); - json_get!( JsonGetFloat, Float64, @@ -152,17 +145,65 @@ json_get!( "Get the value from the JSONB by the given path and return it as a boolean." ); -/// Get the value from the JSONB by the given path and return it as a string. -#[derive(Clone, Debug)] -pub struct JsonGetString { +enum JsonResultValue<'a> { + Jsonb(Vec), + JsonStructByColumn(&'a ArrayRef, usize), + JsonStructByValue(&'a Value), +} + +trait JsonGetResultBuilder { + fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()>; + + fn append_null(&mut self); + + fn build(&mut self) -> ArrayRef; +} + +/// Common implementation for JSON get scalar functions. +/// +/// `JsonGet` encapsulates the logic for extracting values from JSON inputs +/// based on a path expression. Different JSON get functions reuse this +/// implementation by supplying their own `JsonGetResultBuilder` to control +/// how the resulting values are materialized into an Arrow array. +struct JsonGet { signature: Signature, } -impl JsonGetString { - pub const NAME: &'static str = "json_get_string"; +impl JsonGet { + fn invoke(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result + where + F: Fn(usize) -> B, + B: JsonGetResultBuilder, + { + let [arg0, arg1] = extract_args("JSON_GET", &args)?; + + let arg1 = compute::cast(&arg1, &DataType::Utf8View)?; + let paths = arg1.as_string_view(); + + let mut builder = (builder_factory)(arg0.len()); + match arg0.data_type() { + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { + let arg0 = compute::cast(&arg0, &DataType::BinaryView)?; + let jsons = arg0.as_binary_view(); + jsonb_get(jsons, paths, &mut builder)?; + } + DataType::Struct(_) => { + let jsons = arg0.as_struct(); + json_struct_get(jsons, paths, &mut builder)? + } + _ => { + return Err(DataFusionError::Execution(format!( + "JSON_GET not supported argument type {}", + arg0.data_type(), + ))); + } + }; + + Ok(ColumnarValue::Array(builder.build())) + } } -impl Default for JsonGetString { +impl Default for JsonGet { fn default() -> Self { Self { signature: Signature::any(2, Volatility::Immutable), @@ -170,6 +211,13 @@ impl Default for JsonGetString { } } +#[derive(Default)] +pub struct JsonGetString(JsonGet); + +impl JsonGetString { + pub const NAME: &'static str = "json_get_string"; +} + impl Function for JsonGetString { fn name(&self) -> &str { Self::NAME @@ -180,61 +228,142 @@ impl Function for JsonGetString { } fn signature(&self) -> &Signature { - &self.signature + &self.0.signature } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let [arg0, arg1] = extract_args(self.name(), &args)?; + struct StringResultBuilder(StringViewBuilder); - let arg1 = compute::cast(&arg1, &DataType::Utf8View)?; - let paths = arg1.as_string_view(); + impl JsonGetResultBuilder for StringResultBuilder { + fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> { + match value { + JsonResultValue::Jsonb(value) => { + self.0.append_option(jsonb::to_str(&value).ok()) + } + JsonResultValue::JsonStructByColumn(column, i) => { + if let Some(v) = string_array_value_at_index(column, i) { + self.0.append_value(v); + } else { + self.0 + .append_value(arrow_cast::display::array_value_to_string( + column, i, + )?); + } + } + JsonResultValue::JsonStructByValue(value) => { + if let Some(s) = value.as_str() { + self.0.append_value(s) + } else { + self.0.append_value(value.to_string()) + } + } + } + Ok(()) + } - let result = match arg0.data_type() { - DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { - let arg0 = compute::cast(&arg0, &DataType::BinaryView)?; - let jsons = arg0.as_binary_view(); - jsonb_get_string(jsons, paths)? + fn append_null(&mut self) { + self.0.append_null(); } - DataType::Struct(_) => { - let jsons = arg0.as_struct(); - json_struct_get_string(jsons, paths)? - } - _ => { - return Err(DataFusionError::Execution(format!( - "{} not supported argument type {}", - Self::NAME, - arg0.data_type(), - ))); - } - }; - Ok(ColumnarValue::Array(result)) + fn build(&mut self) -> ArrayRef { + Arc::new(self.0.finish()) + } + } + + self.0.invoke(args, |len: usize| { + StringResultBuilder(StringViewBuilder::with_capacity(len)) + }) } } -fn jsonb_get_string(jsons: &BinaryViewArray, paths: &StringViewArray) -> Result { - let size = jsons.len(); - let mut builder = StringViewBuilder::with_capacity(size); +#[derive(Default)] +pub struct JsonGetInt(JsonGet); +impl JsonGetInt { + pub const NAME: &'static str = "json_get_int"; +} + +impl Function for JsonGetInt { + fn name(&self) -> &str { + Self::NAME + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn signature(&self) -> &Signature { + &self.0.signature + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + struct IntResultBuilder(Int64Builder); + + impl JsonGetResultBuilder for IntResultBuilder { + fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> { + match value { + JsonResultValue::Jsonb(value) => { + self.0.append_option(jsonb::to_i64(&value).ok()) + } + JsonResultValue::JsonStructByColumn(column, i) => { + self.0.append_option(int_array_value_at_index(column, i)) + } + JsonResultValue::JsonStructByValue(value) => { + self.0.append_option(value.as_i64()) + } + } + Ok(()) + } + + fn append_null(&mut self) { + self.0.append_null(); + } + + fn build(&mut self) -> ArrayRef { + Arc::new(self.0.finish()) + } + } + + self.0.invoke(args, |len: usize| { + IntResultBuilder(Int64Builder::with_capacity(len)) + }) + } +} + +impl Display for JsonGetInt { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", Self::NAME.to_ascii_uppercase()) + } +} + +fn jsonb_get( + jsons: &BinaryViewArray, + paths: &StringViewArray, + builder: &mut impl JsonGetResultBuilder, +) -> Result<()> { + let size = jsons.len(); for i in 0..size { let json = jsons.is_valid(i).then(|| jsons.value(i)); let path = paths.is_valid(i).then(|| paths.value(i)); let result = match (json, path) { - (Some(json), Some(path)) => { - get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok()) - } + (Some(json), Some(path)) => get_json_by_path(json, path), _ => None, }; - builder.append_option(result); + if let Some(v) = result { + builder.append_value(JsonResultValue::Jsonb(v))?; + } else { + builder.append_null(); + } } - - Ok(Arc::new(builder.finish())) + Ok(()) } -fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Result { +fn json_struct_get( + jsons: &StructArray, + paths: &StringViewArray, + builder: &mut impl JsonGetResultBuilder, +) -> Result<()> { let size = jsons.len(); - let mut builder = StringViewBuilder::with_capacity(size); - for i in 0..size { if jsons.is_null(i) || paths.is_null(i) { builder.append_null(); @@ -247,11 +376,7 @@ fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Resul let column = jsons.column_by_name(&field_path); if let Some(column) = column { - if let Some(v) = string_array_value_at_index(column, i) { - builder.append_value(v); - } else { - builder.append_value(arrow_cast::display::array_value_to_string(column, i)?); - } + builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?; } else { let Some(raw) = jsons .column_by_name(JsonStructureSettings::RAW_FIELD) @@ -272,27 +397,15 @@ fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Resul Value::Null => builder.append_null(), Value::Array(values) => match values.as_slice() { [] => builder.append_null(), - [x] => { - if let Some(s) = x.as_str() { - builder.append_value(s) - } else { - builder.append_value(x.to_string()) - } - } - x => builder.append_value( - x.iter() - .map(|v| v.to_string()) - .collect::>() - .join(", "), - ), + [x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?, + _ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?, }, - // Safety: guarded by the returns of `path.find` as documented - _ => unreachable!(), + value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?, } } } - Ok(Arc::new(builder.finish())) + Ok(()) } fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result { @@ -479,6 +592,50 @@ mod tests { use super::*; + /// Create a JSON object like this (as a one element struct array for testing): + /// + /// ```JSON + /// { + /// "kind": "foo", + /// "payload": { + /// "code": 404, + /// "success": false, + /// "result": { + /// "error": "not found", + /// "time_cost": 1.234 + /// } + /// } + /// } + /// ``` + fn test_json_struct() -> ArrayRef { + Arc::new(StructArray::new( + vec![ + Field::new("kind", DataType::Utf8, true), + Field::new("payload.code", DataType::Int64, true), + Field::new("payload.result.time_cost", DataType::Float64, true), + Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true), + ] + .into(), + vec![ + Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef, + Arc::new(Int64Array::from_iter([Some(404)])), + Arc::new(Float64Array::from_iter([Some(1.234)])), + Arc::new(StringViewArray::from_iter([Some( + json! ({ + "payload": { + "success": false, + "result": { + "error": "not found" + } + } + }) + .to_string(), + )])), + ], + None, + )) + } + #[test] fn test_json_get_int() { let json_get_int = JsonGetInt::default(); @@ -496,37 +653,55 @@ mod tests { r#"{"a": 4, "b": {"c": 6}, "c": 6}"#, r#"{"a": 7, "b": 8, "c": {"a": 7}}"#, ]; - let paths = vec!["$.a.b", "$.a", "$.c"]; - let results = [Some(2), Some(4), None]; + let json_struct = test_json_struct(); - let jsonbs = json_strings + let path_expects = vec![ + ("$.a.b", Some(2)), + ("$.a", Some(4)), + ("$.c", None), + ("$.kind", None), + ("$.payload.code", Some(404)), + ("$.payload.success", None), + ("$.payload.result.time_cost", None), + ("$.payload.not-exists", None), + ("$.not-exists", None), + ("$", None), + ]; + + let mut jsons = json_strings .iter() .map(|s| { let value = jsonb::parse_value(s.as_bytes()).unwrap(); - value.to_vec() + Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef }) .collect::>(); + let json_struct_arrays = + std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::>(); + jsons.extend(json_struct_arrays); - let args = ScalarFunctionArgs { - args: vec![ - ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))), - ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))), - ], - arg_fields: vec![], - number_rows: 3, - return_field: Arc::new(Field::new("x", DataType::Int64, false)), - config_options: Arc::new(Default::default()), - }; - let result = json_get_int - .invoke_with_args(args) - .and_then(|x| x.to_array(3)) - .unwrap(); - let vector = result.as_primitive::(); + for i in 0..jsons.len() { + let json = &jsons[i]; + let (path, expect) = path_expects[i]; - assert_eq!(3, vector.len()); - for (i, gt) in results.iter().enumerate() { - let result = vector.is_valid(i).then(|| vector.value(i)); - assert_eq!(*gt, result); + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(json.clone()), + ColumnarValue::Scalar(path.into()), + ], + arg_fields: vec![], + number_rows: 1, + return_field: Arc::new(Field::new("x", DataType::Int64, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_get_int + .invoke_with_args(args) + .and_then(|x| x.to_array(1)) + .unwrap(); + + let result = result.as_primitive::(); + assert_eq!(1, result.len()); + let actual = result.is_valid(0).then(|| result.value(0)); + assert_eq!(actual, expect); } } @@ -649,45 +824,7 @@ mod tests { r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#, r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#, ]; - - // complete JSON is: - // { - // "kind": "foo", - // "payload": { - // "code": 404, - // "success": false, - // "result": { - // "error": "not found", - // "time_cost": 1.234 - // } - // } - // } - let json_struct: ArrayRef = Arc::new(StructArray::new( - vec![ - Field::new("kind", DataType::Utf8, true), - Field::new("payload.code", DataType::Int64, true), - Field::new("payload.result.time_cost", DataType::Float64, true), - Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true), - ] - .into(), - vec![ - Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef, - Arc::new(Int64Array::from_iter([Some(404)])), - Arc::new(Float64Array::from_iter([Some(1.234)])), - Arc::new(StringViewArray::from_iter([Some( - json! ({ - "payload": { - "success": false, - "result": { - "error": "not found" - } - } - }) - .to_string(), - )])), - ], - None, - )); + let json_struct = test_json_struct(); let paths = vec![ "$.a.b", diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index 97ed7d1bbf..9b64c88bf3 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -15,9 +15,10 @@ use arrow::array::{ArrayRef, AsArray}; use arrow::datatypes::{ DataType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, - DurationSecondType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, - Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, + DurationSecondType, Int8Type, Int16Type, Int32Type, Int64Type, Time32MillisecondType, + Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, }; use arrow_array::Array; use common_time::time::Time; @@ -152,3 +153,62 @@ pub fn string_array_value_at_index(array: &ArrayRef, i: usize) -> Option<&str> { _ => None, } } + +/// Get the integer value (`i64`) at index `i` for any integer array. +/// +/// Returns `None` when: +/// +/// - the array type is not an integer type; +/// - the value is larger than `i64::MAX`; +/// - the value is null. +/// +/// # Panics +/// +/// If index `i` is out of bounds. +pub fn int_array_value_at_index(array: &ArrayRef, i: usize) -> Option { + match array.data_type() { + DataType::Int8 => { + let array = array.as_primitive::(); + array.is_valid(i).then(|| array.value(i) as i64) + } + DataType::Int16 => { + let array = array.as_primitive::(); + array.is_valid(i).then(|| array.value(i) as i64) + } + DataType::Int32 => { + let array = array.as_primitive::(); + array.is_valid(i).then(|| array.value(i) as i64) + } + DataType::Int64 => { + let array = array.as_primitive::(); + array.is_valid(i).then(|| array.value(i)) + } + DataType::UInt8 => { + let array = array.as_primitive::(); + array.is_valid(i).then(|| array.value(i) as i64) + } + DataType::UInt16 => { + let array = array.as_primitive::(); + array.is_valid(i).then(|| array.value(i) as i64) + } + DataType::UInt32 => { + let array = array.as_primitive::(); + array.is_valid(i).then(|| array.value(i) as i64) + } + DataType::UInt64 => { + let array = array.as_primitive::(); + array + .is_valid(i) + .then(|| { + let i = array.value(i); + if i <= i64::MAX as u64 { + Some(i as i64) + } else { + None + } + }) + .flatten() + } + _ => None, + } +} diff --git a/tests-integration/tests/jsonbench.rs b/tests-integration/tests/jsonbench.rs index f5c66bf6ee..076096814d 100644 --- a/tests-integration/tests/jsonbench.rs +++ b/tests-integration/tests/jsonbench.rs @@ -97,6 +97,80 @@ ORDER BY count DESC, event ASC"; +-----------------------+-------+-------+"#; execute_sql_and_expect(frontend, sql, expected).await; + // query 3: + let sql = "\ +SELECT \ + json_get_string(data, '$.commit.collection') AS event, \ + date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day, \ + count() AS count \ +FROM bluesky \ +WHERE \ + (json_get_string(data, '$.kind') = 'commit') AND \ + (json_get_string(data, '$.commit.operation') = 'create') AND \ + json_get_string(data, '$.commit.collection') IN \ + ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') \ +GROUP BY event, hour_of_day \ +ORDER BY hour_of_day, event"; + let expected = r#" ++----------------------+-------------+-------+ +| event | hour_of_day | count | ++----------------------+-------------+-------+ +| app.bsky.feed.like | 16 | 3 | +| app.bsky.feed.post | 16 | 3 | +| app.bsky.feed.repost | 16 | 1 | ++----------------------+-------------+-------+"#; + execute_sql_and_expect(frontend, sql, expected).await; + + // query 4: + let sql = "\ +SELECT + json_get_string(data, '$.did') as user_id, + min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts +FROM bluesky +WHERE + (json_get_string(data, '$.kind') = 'commit') AND + (json_get_string(data, '$.commit.operation') = 'create') AND + (json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post') +GROUP BY user_id +ORDER BY first_post_ts ASC, user_id DESC +LIMIT 3"; + let expected = r#" ++----------------------------------+----------------------------+ +| user_id | first_post_ts | ++----------------------------------+----------------------------+ +| did:plc:yj3sjq3blzpynh27cumnp5ks | 2024-11-21T16:25:49.000167 | +| did:plc:l5o3qjrmfztir54cpwlv2eme | 2024-11-21T16:25:49.001905 | +| did:plc:s4bwqchfzm6gjqfeb6mexgbu | 2024-11-21T16:25:49.003907 | ++----------------------------------+----------------------------+"#; + execute_sql_and_expect(frontend, sql, expected).await; + + // query 5: + let sql = " +SELECT + json_get_string(data, '$.did') as user_id, + date_part( + 'epoch', + max(to_timestamp_micros(json_get_int(data, '$.time_us'))) - + min(to_timestamp_micros(json_get_int(data, '$.time_us'))) + ) AS activity_span +FROM bluesky +WHERE + (json_get_string(data, '$.kind') = 'commit') AND + (json_get_string(data, '$.commit.operation') = 'create') AND + (json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post') +GROUP BY user_id +ORDER BY activity_span DESC, user_id DESC +LIMIT 3"; + let expected = r#" ++----------------------------------+---------------+ +| user_id | activity_span | ++----------------------------------+---------------+ +| did:plc:yj3sjq3blzpynh27cumnp5ks | 0.0 | +| did:plc:s4bwqchfzm6gjqfeb6mexgbu | 0.0 | +| did:plc:l5o3qjrmfztir54cpwlv2eme | 0.0 | ++----------------------------------+---------------+"#; + execute_sql_and_expect(frontend, sql, expected).await; + Ok(()) }