mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 12:30:38 +00:00
feat: udf json_get_object (#7241)
Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
@@ -19,7 +19,7 @@ mod json_path_match;
|
||||
mod json_to_string;
|
||||
mod parse_json;
|
||||
|
||||
use json_get::{JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString};
|
||||
use json_get::{JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetObject, JsonGetString};
|
||||
use json_is::{
|
||||
JsonIsArray, JsonIsBool, JsonIsFloat, JsonIsInt, JsonIsNull, JsonIsObject, JsonIsString,
|
||||
};
|
||||
@@ -39,6 +39,7 @@ impl JsonFunction {
|
||||
registry.register_scalar(JsonGetFloat::default());
|
||||
registry.register_scalar(JsonGetString::default());
|
||||
registry.register_scalar(JsonGetBool::default());
|
||||
registry.register_scalar(JsonGetObject::default());
|
||||
|
||||
registry.register_scalar(JsonIsNull::default());
|
||||
registry.register_scalar(JsonIsInt::default());
|
||||
|
||||
@@ -16,10 +16,13 @@ use std::fmt::{self, Display};
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::compute;
|
||||
use datafusion_common::DataFusionError;
|
||||
use datafusion_common::arrow::array::{
|
||||
Array, AsArray, BooleanBuilder, Float64Builder, Int64Builder, StringViewBuilder,
|
||||
Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
|
||||
StringViewBuilder,
|
||||
};
|
||||
use datafusion_common::arrow::datatypes::DataType;
|
||||
use datafusion_expr::type_coercion::aggregates::STRINGS;
|
||||
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
|
||||
|
||||
use crate::function::{Function, extract_args};
|
||||
@@ -212,13 +215,92 @@ impl Display for JsonGetString {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the object from JSON value by path.
|
||||
pub(super) struct JsonGetObject {
|
||||
signature: Signature,
|
||||
}
|
||||
|
||||
impl JsonGetObject {
|
||||
const NAME: &'static str = "json_get_object";
|
||||
}
|
||||
|
||||
impl Default for JsonGetObject {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
signature: helper::one_of_sigs2(
|
||||
vec![
|
||||
DataType::Binary,
|
||||
DataType::LargeBinary,
|
||||
DataType::BinaryView,
|
||||
],
|
||||
STRINGS.to_vec(),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Function for JsonGetObject {
|
||||
fn name(&self) -> &str {
|
||||
Self::NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
|
||||
Ok(DataType::BinaryView)
|
||||
}
|
||||
|
||||
fn signature(&self) -> &Signature {
|
||||
&self.signature
|
||||
}
|
||||
|
||||
fn invoke_with_args(
|
||||
&self,
|
||||
args: ScalarFunctionArgs,
|
||||
) -> datafusion_common::Result<ColumnarValue> {
|
||||
let [arg0, arg1] = extract_args(self.name(), &args)?;
|
||||
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
|
||||
let jsons = arg0.as_binary_view();
|
||||
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
|
||||
let paths = arg1.as_string_view();
|
||||
|
||||
let len = jsons.len();
|
||||
let mut builder = BinaryViewBuilder::with_capacity(len);
|
||||
|
||||
for i in 0..len {
|
||||
let json = jsons.is_valid(i).then(|| jsons.value(i));
|
||||
let path = paths.is_valid(i).then(|| paths.value(i));
|
||||
let result = if let (Some(json), Some(path)) = (json, path) {
|
||||
let result = jsonb::jsonpath::parse_json_path(path.as_bytes()).and_then(|path| {
|
||||
let mut data = Vec::new();
|
||||
let mut offset = Vec::new();
|
||||
jsonb::get_by_path(json, path, &mut data, &mut offset)
|
||||
.map(|()| jsonb::is_object(&data).then_some(data))
|
||||
});
|
||||
result.map_err(|e| DataFusionError::Execution(e.to_string()))?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
builder.append_option(result);
|
||||
}
|
||||
|
||||
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for JsonGetObject {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", Self::NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_schema::Field;
|
||||
use datafusion_common::arrow::array::{BinaryArray, StringArray};
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
|
||||
use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
|
||||
use datatypes::types::parse_string_to_jsonb;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -425,4 +507,49 @@ mod tests {
|
||||
assert_eq!(*gt, result);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_get_object() -> datafusion_common::Result<()> {
|
||||
let udf = JsonGetObject::default();
|
||||
assert_eq!("json_get_object", udf.name());
|
||||
assert_eq!(
|
||||
DataType::BinaryView,
|
||||
udf.return_type(&[DataType::BinaryView, DataType::Utf8View])?
|
||||
);
|
||||
|
||||
let json_value = parse_string_to_jsonb(r#"{"a": {"b": {"c": {"d": 1}}}}"#).unwrap();
|
||||
let paths = vec!["$", "$.a", "$.a.b", "$.a.b.c", "$.a.b.c.d", "$.e", "$.a.e"];
|
||||
let number_rows = paths.len();
|
||||
|
||||
let args = ScalarFunctionArgs {
|
||||
args: vec![
|
||||
ColumnarValue::Scalar(ScalarValue::Binary(Some(json_value))),
|
||||
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
|
||||
],
|
||||
arg_fields: vec![],
|
||||
number_rows,
|
||||
return_field: Arc::new(Field::new("x", DataType::Binary, false)),
|
||||
config_options: Arc::new(Default::default()),
|
||||
};
|
||||
let result = udf
|
||||
.invoke_with_args(args)
|
||||
.and_then(|x| x.to_array(number_rows))?;
|
||||
let result = result.as_binary_view();
|
||||
|
||||
let expected = &BinaryViewArray::from_iter(
|
||||
vec![
|
||||
Some(r#"{"a": {"b": {"c": {"d": 1}}}}"#),
|
||||
Some(r#"{"b": {"c": {"d": 1}}}"#),
|
||||
Some(r#"{"c": {"d": 1}}"#),
|
||||
Some(r#"{"d": 1}"#),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
]
|
||||
.into_iter()
|
||||
.map(|x| x.and_then(|s| parse_string_to_jsonb(s).ok())),
|
||||
);
|
||||
assert_eq!(result, expected);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +32,15 @@ impl Default for JsonToStringFunction {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
|
||||
signature: Signature::exact(vec![DataType::Binary], Volatility::Immutable),
|
||||
signature: Signature::uniform(
|
||||
1,
|
||||
vec![
|
||||
DataType::Binary,
|
||||
DataType::LargeBinary,
|
||||
DataType::BinaryView,
|
||||
],
|
||||
Volatility::Immutable,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -57,7 +65,8 @@ impl Function for JsonToStringFunction {
|
||||
args: ScalarFunctionArgs,
|
||||
) -> datafusion_common::Result<ColumnarValue> {
|
||||
let [arg0] = extract_args(self.name(), &args)?;
|
||||
let jsons = arg0.as_binary::<i32>();
|
||||
let arg0 = arrow::compute::cast(&arg0, &DataType::BinaryView)?;
|
||||
let jsons = arg0.as_binary_view();
|
||||
|
||||
let size = jsons.len();
|
||||
let mut builder = StringViewBuilder::with_capacity(size);
|
||||
|
||||
Reference in New Issue
Block a user