Compare commits

...

1 Commits

Author SHA1 Message Date
luofucong
48c5a9cdaf impl json_get_string with new json type
Signed-off-by: luofucong <luofc@foxmail.com>
2025-12-26 19:30:34 +08:00
6 changed files with 363 additions and 50 deletions

2
Cargo.lock generated
View File

@@ -2190,6 +2190,7 @@ dependencies = [
"approx 0.5.1", "approx 0.5.1",
"arc-swap", "arc-swap",
"arrow", "arrow",
"arrow-cast",
"arrow-schema", "arrow-schema",
"async-trait", "async-trait",
"bincode", "bincode",
@@ -2220,6 +2221,7 @@ dependencies = [
"h3o", "h3o",
"hyperloglogplus", "hyperloglogplus",
"jsonb", "jsonb",
"jsonpath-rust 0.7.5",
"memchr", "memchr",
"mito-codec", "mito-codec",
"nalgebra", "nalgebra",

View File

@@ -103,6 +103,7 @@ aquamarine = "0.6"
arrow = { version = "56.2", features = ["prettyprint"] } arrow = { version = "56.2", features = ["prettyprint"] }
arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] } arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "56.2" arrow-buffer = "56.2"
arrow-cast = "56.2"
arrow-flight = "56.2" arrow-flight = "56.2"
arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] } arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "56.2", features = ["serde"] } arrow-schema = { version = "56.2", features = ["serde"] }

View File

@@ -17,6 +17,7 @@ ahash.workspace = true
api.workspace = true api.workspace = true
arc-swap = "1.0" arc-swap = "1.0"
arrow.workspace = true arrow.workspace = true
arrow-cast.workspace = true
arrow-schema.workspace = true arrow-schema.workspace = true
async-trait.workspace = true async-trait.workspace = true
bincode = "=1.3.3" bincode = "=1.3.3"
@@ -46,6 +47,7 @@ geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true } h3o = { version = "0.6", optional = true }
hyperloglogplus = "0.4" hyperloglogplus = "0.4"
jsonb.workspace = true jsonb.workspace = true
jsonpath-rust = "0.7.5"
memchr = "2.7" memchr = "2.7"
mito-codec.workspace = true mito-codec.workspace = true
nalgebra.workspace = true nalgebra.workspace = true

View File

@@ -13,17 +13,24 @@
// limitations under the License. // limitations under the License.
use std::fmt::{self, Display}; use std::fmt::{self, Display};
use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
use arrow::compute; use arrow::compute;
use datafusion_common::DataFusionError; use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
use datafusion_common::arrow::array::{ use datafusion_common::arrow::array::{
Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder, Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
StringViewBuilder, StringViewBuilder,
}; };
use datafusion_common::arrow::datatypes::DataType; use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::aggregates::STRINGS; use datafusion_expr::type_coercion::aggregates::STRINGS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature}; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use datatypes::arrow_array::string_array_value_at_index;
use datatypes::json::JsonStructureSettings;
use jsonpath_rust::JsonPath;
use serde_json::Value;
use crate::function::{Function, extract_args}; use crate::function::{Function, extract_args};
use crate::helper; use crate::helper;
@@ -158,11 +165,7 @@ impl JsonGetString {
impl Default for JsonGetString { impl Default for JsonGetString {
fn default() -> Self { fn default() -> Self {
Self { Self {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type. signature: Signature::any(2, Volatility::Immutable),
signature: helper::one_of_sigs2(
vec![DataType::Binary, DataType::BinaryView],
vec![DataType::Utf8, DataType::Utf8View],
),
} }
} }
} }
@@ -172,7 +175,7 @@ impl Function for JsonGetString {
Self::NAME Self::NAME
} }
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> { fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8View) Ok(DataType::Utf8View)
} }
@@ -180,33 +183,203 @@ impl Function for JsonGetString {
&self.signature &self.signature
} }
fn invoke_with_args( fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0, arg1] = extract_args(self.name(), &args)?; let [arg0, arg1] = extract_args(self.name(), &args)?;
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?; let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
let paths = arg1.as_string_view(); let paths = arg1.as_string_view();
let size = jsons.len(); let result = match arg0.data_type() {
let mut builder = StringViewBuilder::with_capacity(size); DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
jsonb_get_string(jsons, paths)?
}
DataType::Struct(_) => {
let jsons = arg0.as_struct();
json_struct_get_string(jsons, paths)?
}
_ => {
return Err(DataFusionError::Execution(format!(
"{} not supported argument type {}",
Self::NAME,
arg0.data_type(),
)));
}
};
for i in 0..size { Ok(ColumnarValue::Array(result))
let json = jsons.is_valid(i).then(|| jsons.value(i)); }
let path = paths.is_valid(i).then(|| paths.value(i)); }
let result = match (json, path) {
(Some(json), Some(path)) => { fn jsonb_get_string(jsons: &BinaryViewArray, paths: &StringViewArray) -> Result<ArrayRef> {
get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok()) let size = jsons.len();
} let mut builder = StringViewBuilder::with_capacity(size);
_ => None,
for i in 0..size {
let json = jsons.is_valid(i).then(|| jsons.value(i));
let path = paths.is_valid(i).then(|| paths.value(i));
let result = match (json, path) {
(Some(json), Some(path)) => {
get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
}
_ => None,
};
builder.append_option(result);
}
Ok(Arc::new(builder.finish()))
}
fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Result<ArrayRef> {
let size = jsons.len();
let mut builder = StringViewBuilder::with_capacity(size);
for i in 0..size {
if jsons.is_null(i) || paths.is_null(i) {
builder.append_null();
continue;
}
let path = paths.value(i);
// naively assume the JSON path is our kind of indexing to the field, by removing its "root"
let field_path = path.replace("$.", "");
let column = jsons.column_by_name(&field_path);
if let Some(column) = column {
if let Some(v) = string_array_value_at_index(column, i) {
builder.append_value(v);
} else {
builder.append_value(arrow_cast::display::array_value_to_string(column, i)?);
}
} else {
let Some(raw) = jsons
.column_by_name(JsonStructureSettings::RAW_FIELD)
.and_then(|x| string_array_value_at_index(x, i))
else {
builder.append_null();
continue;
}; };
builder.append_option(result);
let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
})?;
// the wanted field is not retrievable from the JSON struct columns directly, we have
// to combine everything (columns and the "_raw") into a complete JSON value to find it
let value = json_struct_to_value(raw, jsons, i)?;
match path.find(&value) {
Value::Null => builder.append_null(),
Value::Array(values) => match values.as_slice() {
[] => builder.append_null(),
[x] => {
if let Some(s) = x.as_str() {
builder.append_value(s)
} else {
builder.append_value(x.to_string())
}
}
x => builder.append_value(
x.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(", "),
),
},
// Safety: guarded by the returns of `path.find` as documented
_ => unreachable!(),
}
}
}
Ok(Arc::new(builder.finish()))
}
fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
let Ok(mut json) = Value::from_str(raw) else {
return Err(DataFusionError::Internal(format!(
"inner field '{}' is not a valid JSON string",
JsonStructureSettings::RAW_FIELD
)));
};
for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
if column_name == JsonStructureSettings::RAW_FIELD {
continue;
} }
Ok(ColumnarValue::Array(Arc::new(builder.finish()))) let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
{
let json_pointer = format!("/{}", json_object.replace(".", "/"));
(json_pointer, field)
} else {
("".to_string(), column_name)
};
let Some(json_object) = json
.pointer_mut(&json_pointer)
.and_then(|x| x.as_object_mut())
else {
return Err(DataFusionError::Internal(format!(
"value at JSON pointer '{}' is not an object",
json_pointer
)));
};
macro_rules! insert {
($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
if let Some(value) = $column
.is_valid($i)
.then(|| serde_json::Value::from($column.value($i)))
{
$json_object.insert($field.to_string(), value);
}
}};
}
match column.data_type() {
// boolean => Value::Bool
DataType::Boolean => {
let column = column.as_boolean();
insert!(column, i, json_object, field);
}
// int => Value::Number
DataType::Int64 => {
let column = column.as_primitive::<Int64Type>();
insert!(column, i, json_object, field);
}
DataType::UInt64 => {
let column = column.as_primitive::<UInt64Type>();
insert!(column, i, json_object, field);
}
DataType::Float64 => {
let column = column.as_primitive::<Float64Type>();
insert!(column, i, json_object, field);
}
// string => Value::String
DataType::Utf8 => {
let column = column.as_string::<i32>();
insert!(column, i, json_object, field);
}
DataType::LargeUtf8 => {
let column = column.as_string::<i64>();
insert!(column, i, json_object, field);
}
DataType::Utf8View => {
let column = column.as_string_view();
insert!(column, i, json_object, field);
}
// other => Value::Array and Value::Object
_ => {
return Err(DataFusionError::NotImplemented(format!(
"{} is not yet supported to be executed with field {} of datatype {}",
JsonGetString::NAME,
column_name,
column.data_type()
)));
}
}
} }
Ok(json)
} }
impl Display for JsonGetString { impl Display for JsonGetString {
@@ -296,11 +469,13 @@ impl Display for JsonGetObject {
mod tests { mod tests {
use std::sync::Arc; use std::sync::Arc;
use arrow::array::{Float64Array, Int64Array, StructArray};
use arrow_schema::Field; use arrow_schema::Field;
use datafusion_common::ScalarValue; use datafusion_common::ScalarValue;
use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray}; use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
use datafusion_common::arrow::datatypes::{Float64Type, Int64Type}; use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
use datatypes::types::parse_string_to_jsonb; use datatypes::types::parse_string_to_jsonb;
use serde_json::json;
use super::*; use super::*;
@@ -474,42 +649,123 @@ mod tests {
r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#, r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#, r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
]; ];
let paths = vec!["$.a.b", "$.a", ""];
let results = [Some("a"), Some("d"), None];
let jsonbs = json_strings // complete JSON is:
// {
// "kind": "foo",
// "payload": {
// "code": 404,
// "success": false,
// "result": {
// "error": "not found",
// "time_cost": 1.234
// }
// }
// }
let json_struct: ArrayRef = Arc::new(StructArray::new(
vec![
Field::new("kind", DataType::Utf8, true),
Field::new("payload.code", DataType::Int64, true),
Field::new("payload.result.time_cost", DataType::Float64, true),
Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
]
.into(),
vec![
Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
Arc::new(Int64Array::from_iter([Some(404)])),
Arc::new(Float64Array::from_iter([Some(1.234)])),
Arc::new(StringViewArray::from_iter([Some(
json! ({
"payload": {
"success": false,
"result": {
"error": "not found"
}
}
})
.to_string(),
)])),
],
None,
));
let paths = vec![
"$.a.b",
"$.a",
"",
"$.kind",
"$.payload.code",
"$.payload.result.time_cost",
"$.payload",
"$.payload.success",
"$.payload.result",
"$.payload.result.error",
"$.payload.result.not-exists",
"$.payload.not-exists",
"$.not-exists",
"$",
];
let expects = [
Some("a"),
Some("d"),
None,
Some("foo"),
Some("404"),
Some("1.234"),
Some(
r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
),
Some("false"),
Some(r#"{"error":"not found","time_cost":1.234}"#),
Some("not found"),
None,
None,
None,
Some(
r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
),
];
let mut jsons = json_strings
.iter() .iter()
.map(|s| { .map(|s| {
let value = jsonb::parse_value(s.as_bytes()).unwrap(); let value = jsonb::parse_value(s.as_bytes()).unwrap();
value.to_vec() Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let json_struct_arrays =
std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
jsons.extend(json_struct_arrays);
let args = ScalarFunctionArgs { for i in 0..jsons.len() {
args: vec![ let json = &jsons[i];
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))), let path = paths[i];
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))), let expect = expects[i];
],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_string
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let vector = result.as_string_view();
assert_eq!(3, vector.len()); let args = ScalarFunctionArgs {
for (i, gt) in results.iter().enumerate() { args: vec![
let result = vector.is_valid(i).then(|| vector.value(i)); ColumnarValue::Array(json.clone()),
assert_eq!(*gt, result); ColumnarValue::Scalar(path.into()),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_string
.invoke_with_args(args)
.and_then(|x| x.to_array(1))
.unwrap();
let result = result.as_string_view();
assert_eq!(1, result.len());
let actual = result.is_valid(0).then(|| result.value(0));
assert_eq!(actual, expect);
} }
} }
#[test] #[test]
fn test_json_get_object() -> datafusion_common::Result<()> { fn test_json_get_object() -> Result<()> {
let udf = JsonGetObject::default(); let udf = JsonGetObject::default();
assert_eq!("json_get_object", udf.name()); assert_eq!("json_get_object", udf.name());
assert_eq!( assert_eq!(

View File

@@ -19,6 +19,7 @@ use arrow::datatypes::{
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType, TimestampNanosecondType, TimestampSecondType,
}; };
use arrow_array::Array;
use common_time::time::Time; use common_time::time::Time;
use common_time::{Duration, Timestamp}; use common_time::{Duration, Timestamp};
@@ -126,3 +127,28 @@ pub fn duration_array_value(array: &ArrayRef, i: usize) -> Duration {
}; };
Duration::new(v, time_unit.into()) Duration::new(v, time_unit.into())
} }
/// Get the string value at index `i` for `Utf8`, `LargeUtf8`, or `Utf8View` arrays.
///
/// Returns `None` when the array type is not a string type or the value is null.
///
/// # Panics
///
/// If index `i` is out of bounds.
pub fn string_array_value_at_index(array: &ArrayRef, i: usize) -> Option<&str> {
match array.data_type() {
DataType::Utf8 => {
let array = array.as_string::<i32>();
array.is_valid(i).then(|| array.value(i))
}
DataType::LargeUtf8 => {
let array = array.as_string::<i64>();
array.is_valid(i).then(|| array.value(i))
}
DataType::Utf8View => {
let array = array.as_string_view();
array.is_valid(i).then(|| array.value(i))
}
_ => None,
}
}

View File

@@ -56,6 +56,32 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
))?; ))?;
execute_sql_and_expect(frontend, sql, &expected).await; execute_sql_and_expect(frontend, sql, &expected).await;
// query 1:
let sql = "SELECT json_get_string(data, '$.commit.collection') AS event, count() AS count FROM bluesky GROUP BY event ORDER BY count DESC";
let expected = r#"
+-----------------------+-------+
| event | count |
+-----------------------+-------+
| app.bsky.feed.post | 3 |
| app.bsky.feed.like | 3 |
| app.bsky.graph.follow | 3 |
| app.bsky.feed.repost | 1 |
+-----------------------+-------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
// query 2:
let sql = "SELECT json_get_string(data, '$.commit.collection') AS event, count() AS count, count(DISTINCT json_get_string(data, '$.did')) AS users FROM bluesky WHERE (json_get_string(data, '$.kind') = 'commit') AND (json_get_string(data, '$.commit.operation') = 'create') GROUP BY event ORDER BY count DESC";
let expected = r#"
+-----------------------+-------+-------+
| event | count | users |
+-----------------------+-------+-------+
| app.bsky.feed.post | 3 | 3 |
| app.bsky.feed.like | 3 | 3 |
| app.bsky.graph.follow | 3 | 3 |
| app.bsky.feed.repost | 1 | 1 |
+-----------------------+-------+-------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
Ok(()) Ok(())
} }