feat: impl json_get_int for new json type (#7495)

Update src/common/function/src/scalars/json/json_get.rs



impl `json_get_int` for new json type

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-12-30 17:42:16 +08:00
committed by GitHub
parent b1d81913f5
commit dc9fc582a0
3 changed files with 405 additions and 134 deletions

View File

@@ -27,7 +27,7 @@ use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::aggregates::STRINGS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use datatypes::arrow_array::string_array_value_at_index;
use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
use datatypes::json::JsonStructureSettings;
use jsonpath_rust::JsonPath;
use serde_json::Value;
@@ -131,13 +131,6 @@ macro_rules! json_get {
};
}
json_get!(
JsonGetInt,
Int64,
i64,
"Get the value from the JSONB by the given path and return it as an integer."
);
json_get!(
JsonGetFloat,
Float64,
@@ -152,17 +145,65 @@ json_get!(
"Get the value from the JSONB by the given path and return it as a boolean."
);
/// Get the value from the JSONB by the given path and return it as a string.
#[derive(Clone, Debug)]
pub struct JsonGetString {
enum JsonResultValue<'a> {
Jsonb(Vec<u8>),
JsonStructByColumn(&'a ArrayRef, usize),
JsonStructByValue(&'a Value),
}
trait JsonGetResultBuilder {
fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()>;
fn append_null(&mut self);
fn build(&mut self) -> ArrayRef;
}
/// Common implementation for JSON get scalar functions.
///
/// `JsonGet` encapsulates the logic for extracting values from JSON inputs
/// based on a path expression. Different JSON get functions reuse this
/// implementation by supplying their own `JsonGetResultBuilder` to control
/// how the resulting values are materialized into an Arrow array.
struct JsonGet {
signature: Signature,
}
impl JsonGetString {
pub const NAME: &'static str = "json_get_string";
impl JsonGet {
fn invoke<F, B>(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result<ColumnarValue>
where
F: Fn(usize) -> B,
B: JsonGetResultBuilder,
{
let [arg0, arg1] = extract_args("JSON_GET", &args)?;
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
let paths = arg1.as_string_view();
let mut builder = (builder_factory)(arg0.len());
match arg0.data_type() {
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
jsonb_get(jsons, paths, &mut builder)?;
}
DataType::Struct(_) => {
let jsons = arg0.as_struct();
json_struct_get(jsons, paths, &mut builder)?
}
_ => {
return Err(DataFusionError::Execution(format!(
"JSON_GET not supported argument type {}",
arg0.data_type(),
)));
}
};
Ok(ColumnarValue::Array(builder.build()))
}
}
impl Default for JsonGetString {
impl Default for JsonGet {
fn default() -> Self {
Self {
signature: Signature::any(2, Volatility::Immutable),
@@ -170,6 +211,13 @@ impl Default for JsonGetString {
}
}
#[derive(Default)]
pub struct JsonGetString(JsonGet);
impl JsonGetString {
pub const NAME: &'static str = "json_get_string";
}
impl Function for JsonGetString {
fn name(&self) -> &str {
Self::NAME
@@ -180,61 +228,142 @@ impl Function for JsonGetString {
}
fn signature(&self) -> &Signature {
&self.signature
&self.0.signature
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let [arg0, arg1] = extract_args(self.name(), &args)?;
struct StringResultBuilder(StringViewBuilder);
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
let paths = arg1.as_string_view();
impl JsonGetResultBuilder for StringResultBuilder {
fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
match value {
JsonResultValue::Jsonb(value) => {
self.0.append_option(jsonb::to_str(&value).ok())
}
JsonResultValue::JsonStructByColumn(column, i) => {
if let Some(v) = string_array_value_at_index(column, i) {
self.0.append_value(v);
} else {
self.0
.append_value(arrow_cast::display::array_value_to_string(
column, i,
)?);
}
}
JsonResultValue::JsonStructByValue(value) => {
if let Some(s) = value.as_str() {
self.0.append_value(s)
} else {
self.0.append_value(value.to_string())
}
}
}
Ok(())
}
let result = match arg0.data_type() {
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
jsonb_get_string(jsons, paths)?
fn append_null(&mut self) {
self.0.append_null();
}
DataType::Struct(_) => {
let jsons = arg0.as_struct();
json_struct_get_string(jsons, paths)?
}
_ => {
return Err(DataFusionError::Execution(format!(
"{} not supported argument type {}",
Self::NAME,
arg0.data_type(),
)));
}
};
Ok(ColumnarValue::Array(result))
fn build(&mut self) -> ArrayRef {
Arc::new(self.0.finish())
}
}
self.0.invoke(args, |len: usize| {
StringResultBuilder(StringViewBuilder::with_capacity(len))
})
}
}
fn jsonb_get_string(jsons: &BinaryViewArray, paths: &StringViewArray) -> Result<ArrayRef> {
let size = jsons.len();
let mut builder = StringViewBuilder::with_capacity(size);
#[derive(Default)]
pub struct JsonGetInt(JsonGet);
impl JsonGetInt {
pub const NAME: &'static str = "json_get_int";
}
impl Function for JsonGetInt {
fn name(&self) -> &str {
Self::NAME
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn signature(&self) -> &Signature {
&self.0.signature
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
struct IntResultBuilder(Int64Builder);
impl JsonGetResultBuilder for IntResultBuilder {
fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
match value {
JsonResultValue::Jsonb(value) => {
self.0.append_option(jsonb::to_i64(&value).ok())
}
JsonResultValue::JsonStructByColumn(column, i) => {
self.0.append_option(int_array_value_at_index(column, i))
}
JsonResultValue::JsonStructByValue(value) => {
self.0.append_option(value.as_i64())
}
}
Ok(())
}
fn append_null(&mut self) {
self.0.append_null();
}
fn build(&mut self) -> ArrayRef {
Arc::new(self.0.finish())
}
}
self.0.invoke(args, |len: usize| {
IntResultBuilder(Int64Builder::with_capacity(len))
})
}
}
impl Display for JsonGetInt {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", Self::NAME.to_ascii_uppercase())
}
}
fn jsonb_get(
jsons: &BinaryViewArray,
paths: &StringViewArray,
builder: &mut impl JsonGetResultBuilder,
) -> Result<()> {
let size = jsons.len();
for i in 0..size {
let json = jsons.is_valid(i).then(|| jsons.value(i));
let path = paths.is_valid(i).then(|| paths.value(i));
let result = match (json, path) {
(Some(json), Some(path)) => {
get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
}
(Some(json), Some(path)) => get_json_by_path(json, path),
_ => None,
};
builder.append_option(result);
if let Some(v) = result {
builder.append_value(JsonResultValue::Jsonb(v))?;
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
Ok(())
}
fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Result<ArrayRef> {
fn json_struct_get(
jsons: &StructArray,
paths: &StringViewArray,
builder: &mut impl JsonGetResultBuilder,
) -> Result<()> {
let size = jsons.len();
let mut builder = StringViewBuilder::with_capacity(size);
for i in 0..size {
if jsons.is_null(i) || paths.is_null(i) {
builder.append_null();
@@ -247,11 +376,7 @@ fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Resul
let column = jsons.column_by_name(&field_path);
if let Some(column) = column {
if let Some(v) = string_array_value_at_index(column, i) {
builder.append_value(v);
} else {
builder.append_value(arrow_cast::display::array_value_to_string(column, i)?);
}
builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?;
} else {
let Some(raw) = jsons
.column_by_name(JsonStructureSettings::RAW_FIELD)
@@ -272,27 +397,15 @@ fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Resul
Value::Null => builder.append_null(),
Value::Array(values) => match values.as_slice() {
[] => builder.append_null(),
[x] => {
if let Some(s) = x.as_str() {
builder.append_value(s)
} else {
builder.append_value(x.to_string())
}
}
x => builder.append_value(
x.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(", "),
),
[x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?,
_ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
},
// Safety: guarded by the returns of `path.find` as documented
_ => unreachable!(),
value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
}
}
}
Ok(Arc::new(builder.finish()))
Ok(())
}
fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
@@ -479,6 +592,50 @@ mod tests {
use super::*;
/// Create a JSON object like this (as a one element struct array for testing):
///
/// ```JSON
/// {
/// "kind": "foo",
/// "payload": {
/// "code": 404,
/// "success": false,
/// "result": {
/// "error": "not found",
/// "time_cost": 1.234
/// }
/// }
/// }
/// ```
fn test_json_struct() -> ArrayRef {
Arc::new(StructArray::new(
vec![
Field::new("kind", DataType::Utf8, true),
Field::new("payload.code", DataType::Int64, true),
Field::new("payload.result.time_cost", DataType::Float64, true),
Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
]
.into(),
vec![
Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
Arc::new(Int64Array::from_iter([Some(404)])),
Arc::new(Float64Array::from_iter([Some(1.234)])),
Arc::new(StringViewArray::from_iter([Some(
json! ({
"payload": {
"success": false,
"result": {
"error": "not found"
}
}
})
.to_string(),
)])),
],
None,
))
}
#[test]
fn test_json_get_int() {
let json_get_int = JsonGetInt::default();
@@ -496,37 +653,55 @@ mod tests {
r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
];
let paths = vec!["$.a.b", "$.a", "$.c"];
let results = [Some(2), Some(4), None];
let json_struct = test_json_struct();
let jsonbs = json_strings
let path_expects = vec![
("$.a.b", Some(2)),
("$.a", Some(4)),
("$.c", None),
("$.kind", None),
("$.payload.code", Some(404)),
("$.payload.success", None),
("$.payload.result.time_cost", None),
("$.payload.not-exists", None),
("$.not-exists", None),
("$", None),
];
let mut jsons = json_strings
.iter()
.map(|s| {
let value = jsonb::parse_value(s.as_bytes()).unwrap();
value.to_vec()
Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
})
.collect::<Vec<_>>();
let json_struct_arrays =
std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
jsons.extend(json_struct_arrays);
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::Int64, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_int
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let vector = result.as_primitive::<Int64Type>();
for i in 0..jsons.len() {
let json = &jsons[i];
let (path, expect) = path_expects[i];
assert_eq!(3, vector.len());
for (i, gt) in results.iter().enumerate() {
let result = vector.is_valid(i).then(|| vector.value(i));
assert_eq!(*gt, result);
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(json.clone()),
ColumnarValue::Scalar(path.into()),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("x", DataType::Int64, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_int
.invoke_with_args(args)
.and_then(|x| x.to_array(1))
.unwrap();
let result = result.as_primitive::<Int64Type>();
assert_eq!(1, result.len());
let actual = result.is_valid(0).then(|| result.value(0));
assert_eq!(actual, expect);
}
}
@@ -649,45 +824,7 @@ mod tests {
r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
];
// complete JSON is:
// {
// "kind": "foo",
// "payload": {
// "code": 404,
// "success": false,
// "result": {
// "error": "not found",
// "time_cost": 1.234
// }
// }
// }
let json_struct: ArrayRef = Arc::new(StructArray::new(
vec![
Field::new("kind", DataType::Utf8, true),
Field::new("payload.code", DataType::Int64, true),
Field::new("payload.result.time_cost", DataType::Float64, true),
Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
]
.into(),
vec![
Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
Arc::new(Int64Array::from_iter([Some(404)])),
Arc::new(Float64Array::from_iter([Some(1.234)])),
Arc::new(StringViewArray::from_iter([Some(
json! ({
"payload": {
"success": false,
"result": {
"error": "not found"
}
}
})
.to_string(),
)])),
],
None,
));
let json_struct = test_json_struct();
let paths = vec![
"$.a.b",

View File

@@ -15,9 +15,10 @@
use arrow::array::{ArrayRef, AsArray};
use arrow::datatypes::{
DataType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType,
DurationSecondType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
DurationSecondType, Int8Type, Int16Type, Int32Type, Int64Type, Time32MillisecondType,
Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_array::Array;
use common_time::time::Time;
@@ -152,3 +153,62 @@ pub fn string_array_value_at_index(array: &ArrayRef, i: usize) -> Option<&str> {
_ => None,
}
}
/// Get the integer value (`i64`) at index `i` for any integer array.
///
/// Returns `None` when:
///
/// - the array type is not an integer type;
/// - the value is larger than `i64::MAX`;
/// - the value is null.
///
/// # Panics
///
/// If index `i` is out of bounds.
pub fn int_array_value_at_index(array: &ArrayRef, i: usize) -> Option<i64> {
match array.data_type() {
DataType::Int8 => {
let array = array.as_primitive::<Int8Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::Int16 => {
let array = array.as_primitive::<Int16Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::Int32 => {
let array = array.as_primitive::<Int32Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::Int64 => {
let array = array.as_primitive::<Int64Type>();
array.is_valid(i).then(|| array.value(i))
}
DataType::UInt8 => {
let array = array.as_primitive::<UInt8Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::UInt16 => {
let array = array.as_primitive::<UInt16Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::UInt32 => {
let array = array.as_primitive::<UInt32Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::UInt64 => {
let array = array.as_primitive::<UInt64Type>();
array
.is_valid(i)
.then(|| {
let i = array.value(i);
if i <= i64::MAX as u64 {
Some(i as i64)
} else {
None
}
})
.flatten()
}
_ => None,
}
}

View File

@@ -97,6 +97,80 @@ ORDER BY count DESC, event ASC";
+-----------------------+-------+-------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
// query 3:
let sql = "\
SELECT \
json_get_string(data, '$.commit.collection') AS event, \
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day, \
count() AS count \
FROM bluesky \
WHERE \
(json_get_string(data, '$.kind') = 'commit') AND \
(json_get_string(data, '$.commit.operation') = 'create') AND \
json_get_string(data, '$.commit.collection') IN \
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') \
GROUP BY event, hour_of_day \
ORDER BY hour_of_day, event";
let expected = r#"
+----------------------+-------------+-------+
| event | hour_of_day | count |
+----------------------+-------------+-------+
| app.bsky.feed.like | 16 | 3 |
| app.bsky.feed.post | 16 | 3 |
| app.bsky.feed.repost | 16 | 1 |
+----------------------+-------------+-------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
// query 4:
let sql = "\
SELECT
json_get_string(data, '$.did') as user_id,
min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
GROUP BY user_id
ORDER BY first_post_ts ASC, user_id DESC
LIMIT 3";
let expected = r#"
+----------------------------------+----------------------------+
| user_id | first_post_ts |
+----------------------------------+----------------------------+
| did:plc:yj3sjq3blzpynh27cumnp5ks | 2024-11-21T16:25:49.000167 |
| did:plc:l5o3qjrmfztir54cpwlv2eme | 2024-11-21T16:25:49.001905 |
| did:plc:s4bwqchfzm6gjqfeb6mexgbu | 2024-11-21T16:25:49.003907 |
+----------------------------------+----------------------------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
// query 5:
let sql = "
SELECT
json_get_string(data, '$.did') as user_id,
date_part(
'epoch',
max(to_timestamp_micros(json_get_int(data, '$.time_us'))) -
min(to_timestamp_micros(json_get_int(data, '$.time_us')))
) AS activity_span
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
GROUP BY user_id
ORDER BY activity_span DESC, user_id DESC
LIMIT 3";
let expected = r#"
+----------------------------------+---------------+
| user_id | activity_span |
+----------------------------------+---------------+
| did:plc:yj3sjq3blzpynh27cumnp5ks | 0.0 |
| did:plc:s4bwqchfzm6gjqfeb6mexgbu | 0.0 |
| did:plc:l5o3qjrmfztir54cpwlv2eme | 0.0 |
+----------------------------------+---------------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
Ok(())
}